Spaces:
Running
Running
import os | |
import json | |
import asyncio | |
import aiohttp | |
import logging | |
import requests | |
from typing import List, Dict, Any | |
from datetime import datetime, timezone | |
logger = logging.getLogger(__name__) | |
# Client for interacting with the MCP service | |
class MCPClient: | |
def __init__(self): | |
self.webhook_url = os.getenv("PIPEDREAM_WEBHOOK_URL") | |
if not self.webhook_url: | |
logger.warning("PIPEDREAM_WEBHOOK_URL not set in environment variables") | |
# Set timeout for requests | |
self.timeout = aiohttp.ClientTimeout(total=60) | |
# Fetch app data from MCP service | |
async def fetch_app_data( | |
self, | |
provider: str, | |
services: List[str], | |
query: str, | |
user_id: str, | |
access_token: str | |
) -> Dict[str, Any]: | |
if not self.webhook_url: | |
logger.error("Pipedream webhook URL not configured") | |
return {"error": "Pipedream integration not configured"} | |
# Add debugging | |
print(f"=== MCP fetch_app_data called ===") | |
print(f"Provider: {provider}") | |
print(f"Services: {services}") | |
print(f"Query: {query}") | |
print(f"User ID: {user_id}") | |
print(f"Access token exists: {bool(access_token)}") | |
print(f"Access token length: {len(access_token) if access_token else 0}") | |
print("==================================") | |
# Check if token is None | |
if not access_token: | |
logger.error(f"No access token for {provider}! Cannot proceed.") | |
return {"error": f"No authentication token for {provider}"} | |
payload = { | |
"provider": provider, | |
"services": services, | |
"query": query, | |
"user_id": user_id, | |
"token": access_token, | |
"timestamp": datetime.now(timezone.utc).isoformat() | |
} | |
# Manually set headers | |
headers = {'Content-Type': 'application/json'} | |
print(f"Fetching {provider} data for services: {services}") | |
print(f"Payload to send: {json.dumps({**payload, 'token': 'REDACTED' if payload['token'] else None}, indent=2)}") | |
response = requests.post(self.webhook_url, json={}) | |
print("Status Code:", response.status_code) | |
print("Response text:", response.text) | |
try: | |
print("Parsed JSON:", response.json()) | |
except Exception as e: | |
print("JSON parse error:", e) | |
# try: | |
# async with aiohttp.ClientSession(timeout=self.timeout) as session: | |
# payload_str = json.dumps(payload) | |
# async with session.post(self.webhook_url, json={'data': payload_str}, headers=headers) as response: | |
# print(f"Response status: {response.status}") | |
# print(f"Response headers: {dict(response.headers)}") | |
# if response.status == 200: | |
# # Handle potential empty or null responses from Pipedream | |
# try: | |
# data = await response.json() | |
# print(f"Received data: {json.dumps(data, indent=2)}") | |
# if data is None: | |
# logger.warning("Pipedream returned a null response.") | |
# return {"error": "Received no data from the provider."} | |
# except aiohttp.ContentTypeError: | |
# logger.error("Pipedream returned a non-JSON or empty response.") | |
# return {"error": "Invalid response from the provider."} | |
# # Check if any service within the data returned an auth error | |
# auth_error = False | |
# for service_key in data: | |
# if isinstance(data[service_key], dict) and data[service_key].get('error'): | |
# error_details = data[service_key].get('details', '') | |
# if '401' in str(error_details) or 'authError' in str(error_details) or 'UNAUTHENTICATED' in str(error_details): | |
# auth_error = True | |
# break | |
# if auth_error: | |
# logger.error(f"Authentication failed for {provider}") | |
# return {"error": f"Authentication failed for {provider}. Please reconnect your account."} | |
# logger.info(f"Successfully fetched {provider} data") | |
# return data | |
# else: | |
# error_text = await response.text() | |
# logger.error(f"Pipedream request failed: {response.status} - {error_text}") | |
# return {"error": f"Failed to fetch data: {response.status} - {error_text}"} | |
# except asyncio.TimeoutError: | |
# logger.error("Pipedream request timed out") | |
# return {"error": "Request timed out. Please try again."} | |
# except aiohttp.ClientError as e: | |
# logger.error(f"Network error calling Pipedream: {str(e)}") | |
# return {"error": "Network error. Please check your connection."} | |
# except Exception as e: | |
# # Catching TypeError if data is None from response.json() | |
# logger.error(f"Unexpected error calling Pipedream: {str(e)}") | |
# return {"error": "An unexpected error occurred"} | |
# Format the raw app data into a context string for LLM | |
def format_as_context(self, provider: str, data: Dict[str, Any]) -> str: | |
if not data or "error" in data: | |
return "" | |
context = f"\n[{provider.upper()} APP DATA]\n" | |
context += "=" * 50 + "\n" | |
# Format based on provider | |
if provider == "google": | |
context += self._format_google_data(data) | |
elif provider == "microsoft": | |
context += self._format_microsoft_data(data) | |
elif provider == "slack": | |
context += self._format_slack_data(data) | |
else: | |
context += f"Unknown provider: {provider}\n" | |
context += "=" * 50 + "\n" | |
return context | |
# Helper methods to format data for Google apps | |
def _format_google_data(self, data: Dict[str, Any]) -> str: | |
formatted = "" | |
# Google Drive | |
if "drive" in data and isinstance(data["drive"], dict) and "files" in data["drive"]: | |
formatted += "\nπ GOOGLE DRIVE FILES:\n" | |
formatted += "-" * 30 + "\n" | |
files = data["drive"]["files"] | |
if not files: | |
formatted += "No files found matching the query.\n" | |
else: | |
for i, file in enumerate(files[:10], 1): # Limit to 10 files | |
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" | |
formatted += f" Type: {file.get('mimeType', 'Unknown')}\n" | |
formatted += f" Modified: {file.get('modifiedTime', 'Unknown')}\n" | |
if file.get('webViewLink'): | |
formatted += f" Link: {file['webViewLink']}\n" | |
if file.get('content'): | |
content_preview = file['content'][:500] | |
if len(file['content']) > 500: | |
content_preview += "..." | |
formatted += f" Content Preview:\n {content_preview}\n" | |
formatted += "\n" | |
# Gmail | |
if "gmail" in data and isinstance(data["gmail"], dict) and "messages" in data["gmail"]: | |
formatted += "\nπ§ GMAIL MESSAGES:\n" | |
formatted += "-" * 30 + "\n" | |
messages = data["gmail"]["messages"] | |
if not messages: | |
formatted += "No messages found matching the query.\n" | |
else: | |
for i, msg in enumerate(messages[:10], 1): | |
formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n" | |
formatted += f" Subject: {msg.get('subject', 'No subject')}\n" | |
body_preview = msg.get('body', '')[:300] | |
if msg.get('body', '') and len(msg['body']) > 300: | |
body_preview += "..." | |
formatted += f" Preview: {body_preview}\n" | |
# Google Calendar | |
if "calendar" in data and isinstance(data["calendar"], dict) and "events" in data["calendar"]: | |
formatted += "\nπ GOOGLE CALENDAR EVENTS:\n" | |
formatted += "-" * 30 + "\n" | |
events = data["calendar"]["events"] | |
if not events: | |
formatted += "No calendar events found matching the query.\n" | |
else: | |
for i, event in enumerate(events[:10], 1): | |
formatted += f"\n{i}. Event: {event.get('summary', 'No title')}\n" | |
formatted += f" Time: {event.get('start', 'Unknown')}\n" | |
if event.get('location'): | |
formatted += f" Location: {event['location']}\n" | |
if event.get('description'): | |
desc_preview = event['description'][:200] | |
if len(event['description']) > 200: | |
desc_preview += "..." | |
formatted += f" Description: {desc_preview}\n" | |
# Google Docs | |
if "docs" in data and isinstance(data["docs"], dict) and "docs" in data["docs"]: | |
formatted += "\nπ GOOGLE DOCS:\n" | |
formatted += "-" * 30 + "\n" | |
docs = data["docs"]["docs"] | |
if not docs: | |
formatted += "No documents found matching the query.\n" | |
else: | |
for i, doc in enumerate(docs[:5], 1): | |
formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n" | |
formatted += f" Modified: {doc.get('modifiedTime', 'Unknown')}\n" | |
if doc.get('content'): | |
content_preview = doc['content'][:500] | |
if len(doc['content']) > 500: | |
content_preview += "..." | |
formatted += f" Content Preview:\n {content_preview}\n" | |
# Google Sheets | |
if "sheets" in data and isinstance(data["sheets"], dict) and "sheets" in data["sheets"]: | |
formatted += "\nπ GOOGLE SHEETS:\n" | |
formatted += "-" * 30 + "\n" | |
sheets = data["sheets"]["sheets"] | |
if not sheets: | |
formatted += "No spreadsheets found matching the query.\n" | |
else: | |
for i, sheet in enumerate(sheets[:5], 1): | |
formatted += f"\n{i}. Spreadsheet: {sheet.get('name', 'Unknown')}\n" | |
formatted += f" Modified: {sheet.get('modifiedTime', 'Unknown')}\n" | |
if sheet.get('content'): | |
content_preview = sheet['content'][:300] | |
if len(sheet['content']) > 300: | |
content_preview += "..." | |
formatted += f" Data Preview:\n {content_preview}\n" | |
# Google Tasks | |
if "tasks" in data and isinstance(data["tasks"], dict) and "tasks" in data["tasks"]: | |
formatted += "\nβ GOOGLE TASKS:\n" | |
formatted += "-" * 30 + "\n" | |
tasks = data["tasks"]["tasks"] | |
if not tasks: | |
formatted += "No tasks found matching the query.\n" | |
else: | |
for i, task in enumerate(tasks[:10], 1): | |
formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n" | |
formatted += f" List: {task.get('listTitle', 'Unknown')}\n" | |
formatted += f" Status: {task.get('status', 'Unknown')}\n" | |
if task.get('notes'): | |
formatted += f" Notes: {task['notes'][:200]}...\n" | |
if task.get('due'): | |
formatted += f" Due: {task['due']}\n" | |
# Add other Google services as needed | |
return formatted | |
# Helper methods to format data for Microsoft apps | |
def _format_microsoft_data(self, data: Dict[str, Any]) -> str: | |
formatted = "" | |
# Word | |
if "word" in data and isinstance(data["word"], dict) and "documents" in data["word"]: | |
formatted += "\nπ MICROSOFT WORD DOCUMENTS:\n" | |
formatted += "-" * 30 + "\n" | |
documents = data["word"]["documents"] | |
if not documents: | |
formatted += "No documents found matching the query.\n" | |
else: | |
for i, doc in enumerate(documents[:5], 1): | |
formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n" | |
formatted += f" Modified: {doc.get('lastModifiedDateTime', 'Unknown')}\n" | |
if doc.get('content'): | |
content_preview = doc['content'][:500] | |
if len(doc['content']) > 500: | |
content_preview += "..." | |
formatted += f" Content Preview:\n {content_preview}\n" | |
# Excel | |
if "excel" in data and isinstance(data["excel"], dict) and "workbooks" in data["excel"]: | |
formatted += "\nπ MICROSOFT EXCEL WORKBOOKS:\n" | |
formatted += "-" * 30 + "\n" | |
workbooks = data["excel"]["workbooks"] | |
if not workbooks: | |
formatted += "No workbooks found matching the query.\n" | |
else: | |
for i, wb in enumerate(workbooks[:5], 1): | |
formatted += f"\n{i}. Workbook: {wb.get('name', 'Unknown')}\n" | |
formatted += f" Modified: {wb.get('lastModifiedDateTime', 'Unknown')}\n" | |
if wb.get('content'): | |
content_preview = wb['content'][:500] | |
if len(wb['content']) > 500: | |
content_preview += "..." | |
formatted += f" Content Preview:\n {content_preview}\n" | |
# PowerPoint | |
if "powerpoint" in data and isinstance(data["powerpoint"], dict) and "presentations" in data["powerpoint"]: | |
formatted += "\nπ MICROSOFT POWERPOINT PRESENTATIONS:\n" | |
formatted += "-" * 30 + "\n" | |
presentations = data["powerpoint"]["presentations"] | |
if not presentations: | |
formatted += "No presentations found matching the query.\n" | |
else: | |
for i, pres in enumerate(presentations[:5], 1): | |
formatted += f"\n{i}. Presentation: {pres.get('name', 'Unknown')}\n" | |
formatted += f" Modified: {pres.get('lastModifiedDateTime', 'Unknown')}\n" | |
if pres.get('content'): | |
content_preview = pres['content'][:500] | |
if len(pres['content']) > 500: | |
content_preview += "..." | |
formatted += f" Content Preview:\n {content_preview}\n" | |
# OneDrive/Files | |
if "onedrive" in data and isinstance(data["onedrive"], dict) and "files" in data["onedrive"]: | |
formatted += "\nπ ONEDRIVE FILES:\n" | |
formatted += "-" * 30 + "\n" | |
files = data["onedrive"]["files"] | |
if not files: | |
formatted += "No files found matching the query.\n" | |
else: | |
for i, file in enumerate(files[:10], 1): | |
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" | |
formatted += f" Modified: {file.get('lastModified', 'Unknown')}\n" | |
if file.get('webUrl'): | |
formatted += f" URL: {file['webUrl']}\n" | |
if file.get('content'): | |
content_preview = file['content'][:500] | |
if len(file['content']) > 500: | |
content_preview += "..." | |
formatted += f" Content Preview:\n {content_preview}\n" | |
# Outlook | |
if "outlook" in data and isinstance(data["outlook"], dict) and "messages" in data["outlook"]: | |
formatted += "\nπ§ OUTLOOK MESSAGES:\n" | |
formatted += "-" * 30 + "\n" | |
messages = data["outlook"]["messages"] | |
if not messages: | |
formatted += "No messages found matching the query.\n" | |
else: | |
for i, msg in enumerate(messages[:10], 1): | |
formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n" | |
formatted += f" Subject: {msg.get('subject', 'No subject')}\n" | |
body_preview = msg.get('body', '')[:300] | |
if msg.get('body', '') and len(msg['body']) > 300: | |
body_preview += "..." | |
formatted += f" Preview: {body_preview}\n" | |
# OneNote | |
if "onenote" in data and isinstance(data["onenote"], dict) and "pages" in data["onenote"]: | |
formatted += "\nπ ONENOTE PAGES:\n" | |
formatted += "-" * 30 + "\n" | |
pages = data["onenote"]["pages"] | |
if not pages: | |
formatted += "No pages found matching the query.\n" | |
else: | |
for i, page in enumerate(pages[:10], 1): | |
formatted += f"\n{i}. Page: {page.get('title', 'Unknown')}\n" | |
formatted += f" Section: {page.get('parentSection', 'Unknown')}\n" | |
formatted += f" Modified: {page.get('lastModifiedDateTime', 'Unknown')}\n" | |
if page.get('contentPreview'): | |
formatted += f" Preview: {page['contentPreview'][:200]}...\n" | |
# Microsoft To Do | |
if "todo" in data and isinstance(data["todo"], dict) and "tasks" in data["todo"]: | |
formatted += "\nβ MICROSOFT TO DO:\n" | |
formatted += "-" * 30 + "\n" | |
tasks = data["todo"]["tasks"] | |
if not tasks: | |
formatted += "No tasks found matching the query.\n" | |
else: | |
for i, task in enumerate(tasks[:10], 1): | |
formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n" | |
formatted += f" List: {task.get('listName', 'Unknown')}\n" | |
formatted += f" Status: {'Completed' if task.get('isCompleted') else 'Pending'}\n" | |
if task.get('body', {}).get('content'): | |
formatted += f" Notes: {task['body']['content'][:200]}...\n" | |
if task.get('dueDateTime'): | |
formatted += f" Due: {task['dueDateTime']['dateTime']}\n" | |
# Exchange Calendar | |
if "exchange" in data and isinstance(data["exchange"], dict) and "events" in data["exchange"]: | |
formatted += "\nπ EXCHANGE CALENDAR:\n" | |
formatted += "-" * 30 + "\n" | |
events = data["exchange"]["events"] | |
if not events: | |
formatted += "No calendar events found matching the query.\n" | |
else: | |
for i, event in enumerate(events[:10], 1): | |
formatted += f"\n{i}. Event: {event.get('subject', 'No subject')}\n" | |
formatted += f" Start: {event.get('start', {}).get('dateTime', 'Unknown')}\n" | |
formatted += f" End: {event.get('end', {}).get('dateTime', 'Unknown')}\n" | |
if event.get('location', {}).get('displayName'): | |
formatted += f" Location: {event['location']['displayName']}\n" | |
if event.get('bodyPreview'): | |
formatted += f" Preview: {event['bodyPreview'][:200]}...\n" | |
return formatted | |
# Helper methods to format data for Slack | |
def _format_slack_data(self, data: Dict[str, Any]) -> str: | |
formatted = "\n㪠SLACK MESSAGES:\n" | |
formatted += "-" * 30 + "\n" | |
if "messages" in data and isinstance(data["messages"], list): | |
messages = data["messages"] | |
if not messages: | |
formatted += "No messages found matching the query.\n" | |
else: | |
for i, msg in enumerate(messages[:15], 1): | |
formatted += f"\n{i}. User: {msg.get('user', 'Unknown')}\n" | |
formatted += f" Channel: #{msg.get('channel', 'Unknown')}\n" | |
formatted += f" Message: {msg.get('text', '')}\n" | |
if msg.get('ts'): | |
# Convert timestamp to readable format if needed | |
formatted += f" Time: {msg['ts']}\n" | |
# Slack channels with messages | |
if "channels" in data and isinstance(data["channels"], list): | |
formatted += "\nπ’ SLACK CHANNELS:\n" | |
formatted += "-" * 30 + "\n" | |
for channel in data["channels"][:10]: | |
formatted += f"\nChannel: #{channel.get('name', 'Unknown')}\n" | |
if channel.get('messages'): | |
for msg in channel['messages'][:5]: | |
formatted += f" β’ {msg.get('user', 'Unknown')}: {msg.get('text', '')}\n" | |
# Slack files | |
if "files" in data and isinstance(data["files"], list): | |
formatted += "\nπ SLACK FILES:\n" | |
formatted += "-" * 30 + "\n" | |
files = data["files"] | |
if not files: | |
formatted += "No files found matching the query.\n" | |
else: | |
for i, file in enumerate(files[:10], 1): | |
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" | |
formatted += f" Type: {file.get('mimetype', 'Unknown')}\n" | |
formatted += f" Size: {self._format_file_size(file.get('size', 0))}\n" | |
if file.get('preview'): | |
formatted += f" Preview: {file['preview'][:200]}...\n" | |
return formatted | |
# Helper method to format file sizes | |
def _format_file_size(self, size_bytes: int) -> str: | |
if size_bytes < 1024: | |
return f"{size_bytes} B" | |
elif size_bytes < 1024 * 1024: | |
return f"{size_bytes / 1024:.1f} KB" | |
elif size_bytes < 1024 * 1024 * 1024: | |
return f"{size_bytes / (1024 * 1024):.1f} MB" | |
else: | |
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" | |
if __name__ == "__main__": | |
# Example usage | |
client = MCPClient() | |
print(asyncio.run(client.fetch_app_data( | |
provider="google", | |
services=["gmail"], | |
query="summarize the information from my last 5 emails", | |
user_id="4c1d92c5-ecec-45d5-b8fd-cb0ce5292403", | |
access_token="ya29.a0AS3H6Nw9WnmYv7goOaxsZiwm6qDdaQq4h6tLwD69VVFPa6s7wwPYtzV3EgPIQHMnW_xRIpbcsDzTNmeOs-8gKhnB0RoW27Kuvv75eWcRed5BcWa08JWH5FFeNoSvzr_lZswEV1PZ4e5R4xNXSrtWmV4vJ-UPmwG48HIZn2lkaCgYKAW8SARcSFQHGX2MiUPjegzd64tClSBXeJNUPvw0175" | |
))) |