Spaces:
Paused
Paused
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from webscout import WEBS, transcriber | |
| from typing import Optional | |
| from fastapi.encoders import jsonable_encoder | |
| from bs4 import BeautifulSoup | |
| import requests | |
| import urllib.parse | |
| app = FastAPI() | |
| async def root(): | |
| return {"message": "API documentation can be found at /docs"} | |
| async def health_check(): | |
| return {"status": "OK"} | |
| async def search( | |
| q: str, | |
| max_results: int = 10, | |
| timelimit: Optional[str] = None, | |
| safesearch: str = "moderate", | |
| region: str = "wt-wt", | |
| backend: str = "api" | |
| ): | |
| """Perform a text search.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.text(keywords=q, region=region, safesearch=safesearch, timelimit=timelimit, backend=backend, max_results=max_results) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during search: {e}") | |
| async def images( | |
| q: str, | |
| max_results: int = 10, | |
| safesearch: str = "moderate", | |
| region: str = "wt-wt", | |
| timelimit: Optional[str] = None, | |
| size: Optional[str] = None, | |
| color: Optional[str] = None, | |
| type_image: Optional[str] = None, | |
| layout: Optional[str] = None, | |
| license_image: Optional[str] = None | |
| ): | |
| """Perform an image search.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.images(keywords=q, region=region, safesearch=safesearch, timelimit=timelimit, size=size, color=color, type_image=type_image, layout=layout, license_image=license_image, max_results=max_results) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during image search: {e}") | |
| async def videos( | |
| q: str, | |
| max_results: int = 10, | |
| safesearch: str = "moderate", | |
| region: str = "wt-wt", | |
| timelimit: Optional[str] = None, | |
| resolution: Optional[str] = None, | |
| duration: Optional[str] = None, | |
| license_videos: Optional[str] = None | |
| ): | |
| """Perform a video search.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.videos(keywords=q, region=region, safesearch=safesearch, timelimit=timelimit, resolution=resolution, duration=duration, license_videos=license_videos, max_results=max_results) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during video search: {e}") | |
| async def news( | |
| q: str, | |
| max_results: int = 10, | |
| safesearch: str = "moderate", | |
| region: str = "wt-wt", | |
| timelimit: Optional[str] = None | |
| ): | |
| """Perform a news search.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.news(keywords=q, region=region, safesearch=safesearch, timelimit=timelimit, max_results=max_results) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during news search: {e}") | |
| async def answers(q: str): | |
| """Get instant answers for a query.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.answers(keywords=q) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error getting instant answers: {e}") | |
| async def suggestions(q: str, region: str = "wt-wt"): | |
| """Get search suggestions for a query.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.suggestions(keywords=q, region=region) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error getting search suggestions: {e}") | |
| async def chat( | |
| q: str, | |
| model: str = "gpt-3.5" | |
| ): | |
| """Perform a text search.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.chat(keywords=q, model=model) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error getting chat results: {e}") | |
| def extract_text_from_webpage(html_content): | |
| """Extracts visible text from HTML content using BeautifulSoup.""" | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| # Remove unwanted tags | |
| for tag in soup(["script", "style", "header", "footer", "nav"]): | |
| tag.extract() | |
| # Get the remaining visible text | |
| visible_text = soup.get_text(strip=True) | |
| return visible_text | |
| async def web_extract( | |
| url: str, | |
| max_chars: int = 12000, # Adjust based on token limit | |
| ): | |
| """Extracts text from a given URL.""" | |
| try: | |
| response = requests.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}) | |
| response.raise_for_status() | |
| visible_text = extract_text_from_webpage(response.text) | |
| if len(visible_text) > max_chars: | |
| visible_text = visible_text[:max_chars] + "..." | |
| return {"url": url, "text": visible_text} | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching or processing URL: {e}") | |
| async def web_search_and_extract( | |
| q: str, | |
| max_results: int = 3, | |
| timelimit: Optional[str] = None, | |
| safesearch: str = "moderate", | |
| region: str = "wt-wt", | |
| backend: str = "api", | |
| max_chars: int = 6000, | |
| extract_only: bool = False | |
| ): | |
| """ | |
| Searches using WEBS, extracts text from the top results, and returns both. | |
| """ | |
| try: | |
| with WEBS() as webs: | |
| # Perform WEBS search | |
| search_results = webs.text(keywords=q, region=region, safesearch=safesearch, | |
| timelimit=timelimit, backend=backend, max_results=max_results) | |
| # Extract text from each result's link | |
| extracted_results = [] | |
| for result in search_results: | |
| if 'href' in result: | |
| link = result['href'] | |
| try: | |
| response = requests.get(link, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}) | |
| response.raise_for_status() | |
| visible_text = extract_text_from_webpage(response.text) | |
| if len(visible_text) > max_chars: | |
| visible_text = visible_text[:max_chars] + "..." | |
| extracted_results.append({"link": link, "text": visible_text}) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching or processing {link}: {e}") | |
| extracted_results.append({"link": link, "text": None}) | |
| else: | |
| extracted_results.append({"link": None, "text": None}) | |
| if extract_only: | |
| return JSONResponse(content=jsonable_encoder({extracted_results})) | |
| else: | |
| return JSONResponse(content=jsonable_encoder({"search_results": search_results, "extracted_results": extracted_results})) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during search and extraction: {e}") | |
| async def website_summarizer(url: str): | |
| """Summarizes the content of a given URL using a chat model.""" | |
| try: | |
| # Extract text from the given URL | |
| response = requests.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}) | |
| response.raise_for_status() | |
| visible_text = extract_text_from_webpage(response.text) | |
| if len(visible_text) > 7500: # Adjust max_chars based on your needs | |
| visible_text = visible_text[:7500] + "..." | |
| # Use chat model to summarize the extracted text | |
| with WEBS() as webs: | |
| summary_prompt = f"Summarize this in detail in Paragraph: {visible_text}" | |
| summary_result = webs.chat(keywords=summary_prompt, model="gpt-3.5") | |
| # Return the summary result | |
| return JSONResponse(content=jsonable_encoder({summary_result})) | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching or processing URL: {e}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during summarization: {e}") | |
| async def ask_website(url: str, question: str, model: str = "llama-3-70b"): | |
| """ | |
| Asks a question about the content of a given website. | |
| """ | |
| try: | |
| # Extract text from the given URL | |
| response = requests.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}) | |
| response.raise_for_status() | |
| visible_text = extract_text_from_webpage(response.text) | |
| if len(visible_text) > 7500: # Adjust max_chars based on your needs | |
| visible_text = visible_text[:7500] + "..." | |
| # Construct a prompt for the chat model | |
| prompt = f"Based on the following text, answer this question in Paragraph: [QUESTION] {question} [TEXT] {visible_text}" | |
| # Use chat model to get the answer | |
| with WEBS() as webs: | |
| answer_result = webs.chat(keywords=prompt, model=model) | |
| # Return the answer result | |
| return JSONResponse(content=jsonable_encoder({answer_result})) | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching or processing URL: {e}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during question answering: {e}") | |
| import requests | |
| def get_gemini_content(question, model="flash"): | |
| """ | |
| Sends a request to the Gemini API and returns the content of the response. | |
| Args: | |
| question: The question to ask the Gemini API. | |
| model: The Gemini model to use (flash or pro). Defaults to "flash". | |
| Returns: | |
| The content string from the API response, or None if the request fails. | |
| """ | |
| if model == "pro": | |
| url = f"https://gemini-pro.developer-house.workers.dev/?question={question}" | |
| else: | |
| url = f"https://gemini-flash.developer-house.workers.dev/?question={question}" | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data['content'] | |
| else: | |
| print(f"API request failed with status code: {response.status_code}") | |
| return None | |
| async def gemini(q: str, model: str = "flash"): | |
| """Get answers from Gemini models.""" | |
| try: | |
| content = get_gemini_content(q, model) | |
| if content: | |
| return JSONResponse(content=jsonable_encoder({"answer": content})) | |
| else: | |
| raise HTTPException(status_code=500, detail="Gemini API request failed.") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during Gemini request: {e}") | |
| async def web_gemini(q: str, model: str = "flash"): | |
| """Search the web and get answers from Gemini models.""" | |
| try: | |
| with WEBS() as webs: | |
| search_results = webs.text(keywords=q, max_results=3) | |
| extracted_results = [] | |
| for result in search_results: | |
| if 'href' in result: | |
| link = result['href'] | |
| try: | |
| response = requests.get(link, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}) | |
| response.raise_for_status() | |
| visible_text = extract_text_from_webpage(response.text) | |
| if len(visible_text) > 10000: | |
| visible_text = visible_text[:10000] + "..." | |
| extracted_results.append({"link": link, "text": visible_text}) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching or processing {link}: {e}") | |
| extracted_results.append({"link": link, "text": None}) | |
| else: | |
| extracted_results.append({"link": None, "text": None}) | |
| content = jsonable_encoder({"extracted_results": extracted_results}) | |
| # Call Gemini API based on the selected model | |
| if model == "pro": | |
| url = f"https://gemini-pro.developer-house.workers.dev/?question=Based on the following text from Google Search, answer this question in Paragraph: [QUESTION] {q} [TEXT] {content}" | |
| else: | |
| url = f"https://gemini-flash.developer-house.workers.dev/?question=Based on the following text from Google Search, answer this question in Paragraph: [QUESTION] {q} [TEXT] {content}" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| return data['content'] | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"Error during web search or Gemini request: {e}") | |
| async def ask_website_gemini( | |
| url: str, | |
| q: str, | |
| model: str = "flash" | |
| ): | |
| """Perform a text search.""" | |
| try: | |
| # Extract text from the given URL | |
| response = requests.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}) | |
| response.raise_for_status() | |
| visible_text = extract_text_from_webpage(response.text) | |
| if len(visible_text) > 30000: # Adjust max_chars based on your needs | |
| visible_text = visible_text[:30000] + "..." | |
| if model=="pro": | |
| url = f"https://gemini-pro.developer-house.workers.dev/?question= Based on the following text, answer this question in Paragraph: [QUESTION] {question} [TEXT] {visible_text}" | |
| else: | |
| url = f"https://gemini-flash.developer-house.workers.dev/?question={q}" | |
| response = requests.get(url) | |
| data = response.json() | |
| return data['content'] | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"Gemini API request failed: {e}") | |
| async def maps( | |
| q: str, | |
| place: Optional[str] = None, | |
| street: Optional[str] = None, | |
| city: Optional[str] = None, | |
| county: Optional[str] = None, | |
| state: Optional[str] = None, | |
| country: Optional[str] = None, | |
| postalcode: Optional[str] = None, | |
| latitude: Optional[str] = None, | |
| longitude: Optional[str] = None, | |
| radius: int = 0, | |
| max_results: int = 10 | |
| ): | |
| """Perform a maps search.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.maps(keywords=q, place=place, street=street, city=city, county=county, state=state, country=country, postalcode=postalcode, latitude=latitude, longitude=longitude, radius=radius, max_results=max_results) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during maps search: {e}") | |
| async def translate( | |
| q: str, | |
| from_: Optional[str] = None, | |
| to: str = "en" | |
| ): | |
| """Translate text.""" | |
| try: | |
| with WEBS() as webs: | |
| results = webs.translate(keywords=q, from_=from_, to=to) | |
| return JSONResponse(content=jsonable_encoder(results)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error during translation: {e}") | |
| async def youtube_transcript( | |
| video_id: str, | |
| languages: str = "en", | |
| preserve_formatting: bool = False | |
| ): | |
| """Get the transcript of a YouTube video.""" | |
| try: | |
| languages_list = languages.split(",") | |
| transcript = transcriber.get_transcript(video_id, languages=languages_list, preserve_formatting=preserve_formatting) | |
| return JSONResponse(content=jsonable_encoder(transcript)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error getting YouTube transcript: {e}") | |
| import requests | |
| def get_weather_json(location: str): | |
| url = f"https://wttr.in/{location}?format=j1" | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return {"error": f"Unable to fetch weather data. Status code: {response.status_code}"} | |
| def get_ascii_weather(location: str): | |
| url = f"https://wttr.in/{location}" | |
| response = requests.get(url, headers={'User-Agent': 'curl'}) | |
| if response.status_code == 200: | |
| return response.text | |
| else: | |
| return {"error": f"Unable to fetch weather data. Status code: {response.status_code}"} | |
| # Run the API server if this script is executed | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8080) |