Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from googlesearch import search | |
| from duckduckgo_search import DDGS | |
| import concurrent.futures | |
| import re | |
| app = FastAPI() | |
| API_KEY_DEFAULT = '12345' | |
| class SearchRequest(BaseModel): | |
| API_KEY: str | |
| product: str | |
| # Function to search DuckDuckGo | |
| def duckduckgo_search(query): | |
| try: | |
| results = DDGS().text(f"{query} manual filetype:pdf", max_results=5) | |
| return [res['href'] for res in results] | |
| except: | |
| return [] | |
| # Function to search Google | |
| def google_search(query): | |
| links = [] | |
| try: | |
| api_key = 'AIzaSyDV_uJwrgNtawqtl6GDfeUj6NqO-H1tA4c' | |
| search_engine_id = 'c4ca951b9fc6949cb' | |
| url = f"https://www.googleapis.com/customsearch/v1" | |
| params = { | |
| "key": api_key, | |
| "cx": search_engine_id, | |
| "q": query + " manual filetype:pdf" | |
| } | |
| response = requests.get(url, params=params) | |
| results = response.json() | |
| for item in results.get('items', []): | |
| links.append(item['link']) | |
| except: | |
| pass | |
| try: | |
| extension = "ext:pdf" | |
| for result in search(query + " manual " + extension, num_results=5): | |
| if result.endswith('.pdf'): | |
| links.append(result) | |
| except: | |
| pass | |
| return links | |
| # Function to search Internet Archive | |
| def archive_search(query): | |
| try: | |
| url = "https://archive.org/advancedsearch.php" | |
| params = { | |
| 'q': f'{query} manual', | |
| 'fl[]': ['identifier', 'title', 'format'], | |
| 'rows': 50, | |
| 'page': 1, | |
| 'output': 'json' | |
| } | |
| response = requests.get(url, params=params) | |
| data = response.json() | |
| def extract_hyperlinks(url): | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for link in soup.find_all('a', href=True): | |
| href = link['href'] | |
| if href.endswith('.pdf'): | |
| pdf_files.append(url + '/' + href) | |
| if href.endswith('.iso'): | |
| extract_pdf_from_iso(url + '/' + href + '/') | |
| def extract_pdf_from_iso(iso_url): | |
| iso_response = requests.get(iso_url) | |
| if iso_response.status_code == 200: | |
| iso_soup = BeautifulSoup(iso_response.text, 'html.parser') | |
| for link in iso_soup.find_all('a', href=True): | |
| href = link['href'] | |
| if href.endswith('.pdf'): | |
| pdf_files.append('https:' + href) | |
| pdf_files = [] | |
| def process_doc(doc): | |
| identifier = doc.get('identifier', 'N/A') | |
| pdf_link = f"https://archive.org/download/{identifier}" | |
| extract_hyperlinks(pdf_link) | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| futures = [executor.submit(process_doc, doc) for doc in data['response']['docs']] | |
| for future in concurrent.futures.as_completed(futures): | |
| try: | |
| future.result() | |
| except Exception as exc: | |
| print(f'Generated an exception: {exc}') | |
| return pdf_files | |
| except: | |
| return [] | |
| def github_search(query): | |
| try: | |
| url = f"https://api.github.com/search/code?q={query}+extension:md" | |
| headers = { | |
| 'Authorization': 'Token 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' | |
| } | |
| response = requests.get(url, headers=headers) | |
| data = response.json() | |
| links = [item['html_url'].replace('/blob','').replace('//github','//raw.github') for item in data['items']] | |
| return links | |
| except: | |
| return [] | |
| def extract_similar_products(query): | |
| results = DDGS().chat(f'{query} Similar Products') | |
| pattern = r'^\d+\.\s(.+)$' | |
| matches = re.findall(pattern, results, re.MULTILINE) | |
| matches = [item.split(': ')[0] for item in matches] | |
| return matches[:5] if matches else [] | |
| def read_root(): | |
| return {"message": "Welcome to the search API"} | |
| async def search_google(request: SearchRequest): | |
| if request.API_KEY == API_KEY_DEFAULT: | |
| results = {request.product: google_search(request.product)} | |
| similar_products = extract_similar_products(request.product) | |
| for p in similar_products: | |
| results[p] = google_search(p) | |
| return results | |
| else: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| async def search_duckduckgo(request: SearchRequest): | |
| if request.API_KEY == API_KEY_DEFAULT: | |
| results = {request.product: duckduckgo_search(request.product)} | |
| similar_products = extract_similar_products(request.product) | |
| for p in similar_products: | |
| results[p] = duckduckgo_search(p) | |
| return results | |
| else: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| async def search_archive(request: SearchRequest): | |
| if request.API_KEY == API_KEY_DEFAULT: | |
| results = {request.product: archive_search(request.product)} | |
| similar_products = extract_similar_products(request.product) | |
| def process_product(product): | |
| return product, archive_search(product) | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future_to_product = {executor.submit(process_product, p): p for p in similar_products} | |
| for future in concurrent.futures.as_completed(future_to_product): | |
| product, result = future.result() | |
| results[product] = result | |
| return results | |
| else: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| async def search_github(request: SearchRequest): | |
| if request.API_KEY == API_KEY_DEFAULT: | |
| results = {request.product: github_search(request.product)} | |
| similar_products = extract_similar_products(request.product) | |
| for p in similar_products: | |
| results[p] = github_search(p) | |
| return results | |
| else: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| async def search_all(request: SearchRequest): | |
| if request.API_KEY == API_KEY_DEFAULT: | |
| results = { | |
| request.product: [ | |
| {'duckduckgo': duckduckgo_search(request.product)}, | |
| {'google': google_search(request.product)}, | |
| {'github': github_search(request.product)}, | |
| {'archive': archive_search(request.product)} | |
| ] | |
| } | |
| def search_product(p): | |
| return { | |
| 'product': p, | |
| 'duckduckgo': duckduckgo_search(p), | |
| 'google': google_search(p), | |
| 'github': github_search(p), | |
| 'archive': archive_search(p) | |
| } | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future_to_product = {executor.submit(search_product, p): p for p in extract_similar_products(request.product)} | |
| for future in concurrent.futures.as_completed(future_to_product): | |
| result = future.result() | |
| product = result['product'] | |
| results[product] = [ | |
| {'duckduckgo': result['duckduckgo']}, | |
| {'google': result['google']}, | |
| {'github': result['github']}, | |
| {'archive': result['archive']} | |
| ] | |
| return results | |
| else: | |
| raise HTTPException(status_code=401, detail="Invalid API key") |