import gradio as gr import re import requests import tempfile import time from pyzotero import zotero from paperqa import Docs from lxml import html from models import Icons, Message def reset_answer(): return gr.HTML.update(value=None) def fetch_collections(id, type, key, messages): zot = zotero.Zotero(int(id), type.lower(), key) try: collections = zot.collections_top() collection_names = [ f"{x['data']['name']} ({x['meta']['numItems']})" for x in collections] messages.append( Message(Icons.INFO, "Please select a Zotero collection to proceed.")) return ( zot, collections, gr.Radio.update(choices=collection_names, visible=True, interactive=True), gr.Button.update(visible=False), gr.HTML.update(visible=False), messages, gr.HTML.update(value=str(messages)), ) except Exception as e: messages.append( Message(Icons.ERR, f"Error occurred when fetching Zotero collection: {e}")) print({'messages': str(messages)}) return ( None, [], None, gr.Button.update(visible=True), None, messages, gr.HTML.update(value=str(messages)), ) def select_collection(collection, messages): if collection is None: return None, messages, gr.HTML.update() collection_name = re.sub('\s\(\d+\)$', '', collection) messages.set([Message( Icons.OK, f"Selected collection: {collection_name}. Please type your question and hit \"Enter\".")]) return ( gr.Text.update( placeholder="Please type your question and hit \"Enter\".", interactive=True), messages, gr.HTML.update(value=str(messages)), gr.HTML.update(value=None) ) def search_attachments(id, type, key, collection, queries=[], limit=10): try: zot = zotero.Zotero(int(id), type.lower(), key) searches = [zot.collection_items( collection['key'], q=q, limit=limit, itemType='attachment', qmode='everything' ) for q in queries] attachments = [x for x in {item['key']: item for search in searches for item in search if item['data'] ['contentType'] == 'application/pdf'}.values()][:limit] parents = set([a['data']['parentItem'] for a in attachments]) message = f"
✅ Found {len(attachments)} PDF {'attachments' if len(attachments) > 1 else 'attachment'} from {len(parents)} {'articles' if len(parents) > 1 else 'article'}.
" if len( attachments) else "
❔ No results. Make sure to index your PDF attachments in Zotero.
" return parents, attachments, message except Exception as e: message = f"
⚠️ Error occurred when searching in Zotero: {e}
" return [], [], message def download_attachment(id, type, key, attachment): zot = zotero.Zotero(int(id), type.lower(), key) link_mode = attachment['data']['linkMode'] if link_mode == 'imported_file': return zot.file(attachment['key']) elif link_mode == 'imported_url': res = requests.get(attachment['data']['url']) return res.content else: raise ValueError( f'Unsupported link mode: {link_mode} for {attachment["key"]}.') def reset_collection(): return ( gr.Radio.update(choices=[], visible=False), gr.HTML.update(visible=True), gr.Text.update( placeholder="You have to select a Zotero collection to proceed", interactive=False), gr.HTML.update(value=None) ) def handle_submit(zot, collection_name, collections, question, messages): collection_name_only = re.sub('\s\(\d+\)$', '', collection_name) messages.set([Message( Icons.OK, f"Selected collection: {collection_name_only}.")]) yield ( messages, gr.HTML.update(value=str(messages)), None, ) docs = Docs() # Generate search queries from the question by Paper QA try: question_prompt = 'A "keyword search" is a list of no more than 3 words, which separated by whitespace only and with no boolean operators (e.g. "dog canine puppy"). Avoid adding any new words not in the question unless they are synonyms to the existing words.' queries = [x.strip('"').lower() for x in docs.generate_search_query(question + '\n' + question_prompt)] query_str = ", ".join( [f"{q}" for q in queries]) messages.append( Message(Icons.WAIT, f"Searching your Zotero collection for {query_str}.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) except Exception as e: messages.append( Message(Icons.ERR, f"Error occurred when generating search queries: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None # Search for attachments in Zotero try: collection = [ x for x in collections if f"{x['data']['name']} ({x['meta']['numItems']})" == collection_name][0] searches = [zot.collection_items( collection['key'], q=q, limit=10, itemType='attachment', qmode='everything' ) for q in queries] attachments = [x for x in { item['key']: item for search in searches for item in search if item['data']['contentType'] == 'application/pdf'}.values()][:10] parents = set([a['data']['parentItem'] for a in attachments]) if len(attachments) > 0: messages.append(Message( Icons.SUCCESS, f"Found {len(attachments)} PDF {'attachments' if len(attachments) > 1 else 'attachment'} from {len(parents)} {'articles' if len(parents) > 1 else 'article'}.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) else: messages.append(Message( Icons.ERR, "No results. Make sure to index your PDF attachments in Zotero and try rephrasing your question.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None except Exception as e: messages.append( Message(Icons.ERR, f"Error occurred when searching in Zotero: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None # Compile citation metadata citation_dict = {} parents = {} messages.append( Message(Icons.WAIT, f"Fetching attachment bibliography information.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) for attachment in attachments: parent_id = attachment["data"]["parentItem"] try: if parent_id in parents: citation_dict[attachment["key"]] = parents[parent_id] else: parent = zot.item( attachment["data"]["parentItem"], content="bib", style="nature")[0] bib = f""" {html.fragment_fromstring(parent).xpath("normalize-space(div[2])")} Open in Zotero """ parents[parent_id] = bib citation_dict[attachment["key"]] = bib except Exception as e: messages.append(Message( Icons.WARN, f"Failed to retrieve bibliography for PDF attachment {attachment['data']['title']}: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) # Index attachments available_attachments = 0 for attachment in attachments: try: link_mode = attachment['data']['linkMode'] if link_mode in ['imported_file', 'imported_url']: attachment_content = zot.file(attachment['key']) if link_mode == 'imported_file' else requests.get( attachment['data']['url']).content temp_file = tempfile.NamedTemporaryFile(suffix=".pdf") temp_file.write(attachment_content) temp_file.flush() docs.add(temp_file.name, citation_dict[attachment["key"]]) messages.append(Message( Icons.INDEX, f"Indexed PDF attachment: {attachment['data']['title']}.")) available_attachments += 1 else: messages.append(Message( Icons.WARN, f"Unable to access linked PDF attachment {attachment['data']['title']}: The file is not in Zotero online storage.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) except Exception as e: messages.append(Message( Icons.WARN, f"Failed to retrieve PDF attachment {attachment['data']['title']}: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) # Build vector index if available_attachments == 0: messages.append(Message( Icons.ERR, "No answer. Unable to access any PDF attachments from your Zotero online storage or public URLs.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None if docs._faiss_index is None: messages.append(Message( Icons.WAIT, f"Building vector index based on {available_attachments} available PDF {'attachment' if attachments==1 else 'attachments'}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) docs._build_faiss_index() # Synthesize response messages.append(Message( Icons.WAIT, f"Creating answer. This will loop through all available PDF {'attachment' if attachments==1 else 'attachments'} and may take {'a few' if available_attachments > 2 else 'a couple of'} minutes.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) try: start_time = time.time() total_time = 0 for i, answer in enumerate(docs.query_gen(question)): end_time = time.time() time_dif = end_time - start_time if time_dif > 5: start_time = end_time total_time += time_dif messages.append(Message( Icons.INFO, f"Still in prgress: {total_time:.1f} seconds")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) answer_text = '\n'.join( [f"
{x}
" for x in answer.answer.split('\n')]) references = '\n'.join([f"
  • {x.split('.', 1)[1]}
  • " for x in answer.references.split('\n\n')]) formatted_answer = f"""
    {answer_text}

    References:

      {references}
    Tokens Used: {answer.tokens} Cost: ${answer.tokens/1000 * 0.002:.2f}
    """.strip() messages.append(Message( Icons.OK, f"Answer created.")) yield ( messages, gr.HTML.update(value=str(messages)), gr.HTML.update(value=formatted_answer) ) except Exception as e: messages.append(Message( Icons.ERR, f"Error occurred when creating answer: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None