import os import json import asyncio import aiohttp import logging import requests from typing import List, Dict, Any from datetime import datetime, timezone logger = logging.getLogger(__name__) # Client for interacting with the MCP service class MCPClient: def __init__(self): self.webhook_url = os.getenv("PIPEDREAM_WEBHOOK_URL") if not self.webhook_url: logger.warning("PIPEDREAM_WEBHOOK_URL not set in environment variables") # Set timeout for requests self.timeout = aiohttp.ClientTimeout(total=60) # Fetch app data from MCP service async def fetch_app_data( self, provider: str, services: List[str], query: str, user_id: str, access_token: str ) -> Dict[str, Any]: if not self.webhook_url: logger.error("Pipedream webhook URL not configured") return {"error": "Pipedream integration not configured"} # Add debugging print(f"=== MCP fetch_app_data called ===") print(f"Provider: {provider}") print(f"Services: {services}") print(f"Query: {query}") print(f"User ID: {user_id}") print(f"Access token exists: {bool(access_token)}") print(f"Access token length: {len(access_token) if access_token else 0}") print("==================================") # Check if token is None if not access_token: logger.error(f"No access token for {provider}! Cannot proceed.") return {"error": f"No authentication token for {provider}"} payload = { "provider": provider, "services": services, "query": query, "user_id": user_id, "token": access_token, "timestamp": datetime.now(timezone.utc).isoformat() } # Manually set headers headers = {'Content-Type': 'application/json'} print(f"Fetching {provider} data for services: {services}") print(f"Payload to send: {json.dumps({**payload, 'token': 'REDACTED' if payload['token'] else None}, indent=2)}") response = requests.post(self.webhook_url, json={}) print("Status Code:", response.status_code) print("Response text:", response.text) try: print("Parsed JSON:", response.json()) except Exception as e: print("JSON parse error:", e) # try: # async with aiohttp.ClientSession(timeout=self.timeout) as session: # payload_str = json.dumps(payload) # async with session.post(self.webhook_url, json={'data': payload_str}, headers=headers) as response: # print(f"Response status: {response.status}") # print(f"Response headers: {dict(response.headers)}") # if response.status == 200: # # Handle potential empty or null responses from Pipedream # try: # data = await response.json() # print(f"Received data: {json.dumps(data, indent=2)}") # if data is None: # logger.warning("Pipedream returned a null response.") # return {"error": "Received no data from the provider."} # except aiohttp.ContentTypeError: # logger.error("Pipedream returned a non-JSON or empty response.") # return {"error": "Invalid response from the provider."} # # Check if any service within the data returned an auth error # auth_error = False # for service_key in data: # if isinstance(data[service_key], dict) and data[service_key].get('error'): # error_details = data[service_key].get('details', '') # if '401' in str(error_details) or 'authError' in str(error_details) or 'UNAUTHENTICATED' in str(error_details): # auth_error = True # break # if auth_error: # logger.error(f"Authentication failed for {provider}") # return {"error": f"Authentication failed for {provider}. Please reconnect your account."} # logger.info(f"Successfully fetched {provider} data") # return data # else: # error_text = await response.text() # logger.error(f"Pipedream request failed: {response.status} - {error_text}") # return {"error": f"Failed to fetch data: {response.status} - {error_text}"} # except asyncio.TimeoutError: # logger.error("Pipedream request timed out") # return {"error": "Request timed out. Please try again."} # except aiohttp.ClientError as e: # logger.error(f"Network error calling Pipedream: {str(e)}") # return {"error": "Network error. Please check your connection."} # except Exception as e: # # Catching TypeError if data is None from response.json() # logger.error(f"Unexpected error calling Pipedream: {str(e)}") # return {"error": "An unexpected error occurred"} # Format the raw app data into a context string for LLM def format_as_context(self, provider: str, data: Dict[str, Any]) -> str: if not data or "error" in data: return "" context = f"\n[{provider.upper()} APP DATA]\n" context += "=" * 50 + "\n" # Format based on provider if provider == "google": context += self._format_google_data(data) elif provider == "microsoft": context += self._format_microsoft_data(data) elif provider == "slack": context += self._format_slack_data(data) else: context += f"Unknown provider: {provider}\n" context += "=" * 50 + "\n" return context # Helper methods to format data for Google apps def _format_google_data(self, data: Dict[str, Any]) -> str: formatted = "" # Google Drive if "drive" in data and isinstance(data["drive"], dict) and "files" in data["drive"]: formatted += "\nšŸ“ GOOGLE DRIVE FILES:\n" formatted += "-" * 30 + "\n" files = data["drive"]["files"] if not files: formatted += "No files found matching the query.\n" else: for i, file in enumerate(files[:10], 1): # Limit to 10 files formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" formatted += f" Type: {file.get('mimeType', 'Unknown')}\n" formatted += f" Modified: {file.get('modifiedTime', 'Unknown')}\n" if file.get('webViewLink'): formatted += f" Link: {file['webViewLink']}\n" if file.get('content'): content_preview = file['content'][:500] if len(file['content']) > 500: content_preview += "..." formatted += f" Content Preview:\n {content_preview}\n" formatted += "\n" # Gmail if "gmail" in data and isinstance(data["gmail"], dict) and "messages" in data["gmail"]: formatted += "\nšŸ“§ GMAIL MESSAGES:\n" formatted += "-" * 30 + "\n" messages = data["gmail"]["messages"] if not messages: formatted += "No messages found matching the query.\n" else: for i, msg in enumerate(messages[:10], 1): formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n" formatted += f" Subject: {msg.get('subject', 'No subject')}\n" body_preview = msg.get('body', '')[:300] if msg.get('body', '') and len(msg['body']) > 300: body_preview += "..." formatted += f" Preview: {body_preview}\n" # Google Calendar if "calendar" in data and isinstance(data["calendar"], dict) and "events" in data["calendar"]: formatted += "\nšŸ“… GOOGLE CALENDAR EVENTS:\n" formatted += "-" * 30 + "\n" events = data["calendar"]["events"] if not events: formatted += "No calendar events found matching the query.\n" else: for i, event in enumerate(events[:10], 1): formatted += f"\n{i}. Event: {event.get('summary', 'No title')}\n" formatted += f" Time: {event.get('start', 'Unknown')}\n" if event.get('location'): formatted += f" Location: {event['location']}\n" if event.get('description'): desc_preview = event['description'][:200] if len(event['description']) > 200: desc_preview += "..." formatted += f" Description: {desc_preview}\n" # Google Docs if "docs" in data and isinstance(data["docs"], dict) and "docs" in data["docs"]: formatted += "\nšŸ“„ GOOGLE DOCS:\n" formatted += "-" * 30 + "\n" docs = data["docs"]["docs"] if not docs: formatted += "No documents found matching the query.\n" else: for i, doc in enumerate(docs[:5], 1): formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n" formatted += f" Modified: {doc.get('modifiedTime', 'Unknown')}\n" if doc.get('content'): content_preview = doc['content'][:500] if len(doc['content']) > 500: content_preview += "..." formatted += f" Content Preview:\n {content_preview}\n" # Google Sheets if "sheets" in data and isinstance(data["sheets"], dict) and "sheets" in data["sheets"]: formatted += "\nšŸ“Š GOOGLE SHEETS:\n" formatted += "-" * 30 + "\n" sheets = data["sheets"]["sheets"] if not sheets: formatted += "No spreadsheets found matching the query.\n" else: for i, sheet in enumerate(sheets[:5], 1): formatted += f"\n{i}. Spreadsheet: {sheet.get('name', 'Unknown')}\n" formatted += f" Modified: {sheet.get('modifiedTime', 'Unknown')}\n" if sheet.get('content'): content_preview = sheet['content'][:300] if len(sheet['content']) > 300: content_preview += "..." formatted += f" Data Preview:\n {content_preview}\n" # Google Tasks if "tasks" in data and isinstance(data["tasks"], dict) and "tasks" in data["tasks"]: formatted += "\nāœ… GOOGLE TASKS:\n" formatted += "-" * 30 + "\n" tasks = data["tasks"]["tasks"] if not tasks: formatted += "No tasks found matching the query.\n" else: for i, task in enumerate(tasks[:10], 1): formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n" formatted += f" List: {task.get('listTitle', 'Unknown')}\n" formatted += f" Status: {task.get('status', 'Unknown')}\n" if task.get('notes'): formatted += f" Notes: {task['notes'][:200]}...\n" if task.get('due'): formatted += f" Due: {task['due']}\n" # Add other Google services as needed return formatted # Helper methods to format data for Microsoft apps def _format_microsoft_data(self, data: Dict[str, Any]) -> str: formatted = "" # Word if "word" in data and isinstance(data["word"], dict) and "documents" in data["word"]: formatted += "\nšŸ“„ MICROSOFT WORD DOCUMENTS:\n" formatted += "-" * 30 + "\n" documents = data["word"]["documents"] if not documents: formatted += "No documents found matching the query.\n" else: for i, doc in enumerate(documents[:5], 1): formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n" formatted += f" Modified: {doc.get('lastModifiedDateTime', 'Unknown')}\n" if doc.get('content'): content_preview = doc['content'][:500] if len(doc['content']) > 500: content_preview += "..." formatted += f" Content Preview:\n {content_preview}\n" # Excel if "excel" in data and isinstance(data["excel"], dict) and "workbooks" in data["excel"]: formatted += "\nšŸ“Š MICROSOFT EXCEL WORKBOOKS:\n" formatted += "-" * 30 + "\n" workbooks = data["excel"]["workbooks"] if not workbooks: formatted += "No workbooks found matching the query.\n" else: for i, wb in enumerate(workbooks[:5], 1): formatted += f"\n{i}. Workbook: {wb.get('name', 'Unknown')}\n" formatted += f" Modified: {wb.get('lastModifiedDateTime', 'Unknown')}\n" if wb.get('content'): content_preview = wb['content'][:500] if len(wb['content']) > 500: content_preview += "..." formatted += f" Content Preview:\n {content_preview}\n" # PowerPoint if "powerpoint" in data and isinstance(data["powerpoint"], dict) and "presentations" in data["powerpoint"]: formatted += "\nšŸ“Š MICROSOFT POWERPOINT PRESENTATIONS:\n" formatted += "-" * 30 + "\n" presentations = data["powerpoint"]["presentations"] if not presentations: formatted += "No presentations found matching the query.\n" else: for i, pres in enumerate(presentations[:5], 1): formatted += f"\n{i}. Presentation: {pres.get('name', 'Unknown')}\n" formatted += f" Modified: {pres.get('lastModifiedDateTime', 'Unknown')}\n" if pres.get('content'): content_preview = pres['content'][:500] if len(pres['content']) > 500: content_preview += "..." formatted += f" Content Preview:\n {content_preview}\n" # OneDrive/Files if "onedrive" in data and isinstance(data["onedrive"], dict) and "files" in data["onedrive"]: formatted += "\nšŸ“ ONEDRIVE FILES:\n" formatted += "-" * 30 + "\n" files = data["onedrive"]["files"] if not files: formatted += "No files found matching the query.\n" else: for i, file in enumerate(files[:10], 1): formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" formatted += f" Modified: {file.get('lastModified', 'Unknown')}\n" if file.get('webUrl'): formatted += f" URL: {file['webUrl']}\n" if file.get('content'): content_preview = file['content'][:500] if len(file['content']) > 500: content_preview += "..." formatted += f" Content Preview:\n {content_preview}\n" # Outlook if "outlook" in data and isinstance(data["outlook"], dict) and "messages" in data["outlook"]: formatted += "\nšŸ“§ OUTLOOK MESSAGES:\n" formatted += "-" * 30 + "\n" messages = data["outlook"]["messages"] if not messages: formatted += "No messages found matching the query.\n" else: for i, msg in enumerate(messages[:10], 1): formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n" formatted += f" Subject: {msg.get('subject', 'No subject')}\n" body_preview = msg.get('body', '')[:300] if msg.get('body', '') and len(msg['body']) > 300: body_preview += "..." formatted += f" Preview: {body_preview}\n" # OneNote if "onenote" in data and isinstance(data["onenote"], dict) and "pages" in data["onenote"]: formatted += "\nšŸ““ ONENOTE PAGES:\n" formatted += "-" * 30 + "\n" pages = data["onenote"]["pages"] if not pages: formatted += "No pages found matching the query.\n" else: for i, page in enumerate(pages[:10], 1): formatted += f"\n{i}. Page: {page.get('title', 'Unknown')}\n" formatted += f" Section: {page.get('parentSection', 'Unknown')}\n" formatted += f" Modified: {page.get('lastModifiedDateTime', 'Unknown')}\n" if page.get('contentPreview'): formatted += f" Preview: {page['contentPreview'][:200]}...\n" # Microsoft To Do if "todo" in data and isinstance(data["todo"], dict) and "tasks" in data["todo"]: formatted += "\nāœ… MICROSOFT TO DO:\n" formatted += "-" * 30 + "\n" tasks = data["todo"]["tasks"] if not tasks: formatted += "No tasks found matching the query.\n" else: for i, task in enumerate(tasks[:10], 1): formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n" formatted += f" List: {task.get('listName', 'Unknown')}\n" formatted += f" Status: {'Completed' if task.get('isCompleted') else 'Pending'}\n" if task.get('body', {}).get('content'): formatted += f" Notes: {task['body']['content'][:200]}...\n" if task.get('dueDateTime'): formatted += f" Due: {task['dueDateTime']['dateTime']}\n" # Exchange Calendar if "exchange" in data and isinstance(data["exchange"], dict) and "events" in data["exchange"]: formatted += "\nšŸ“… EXCHANGE CALENDAR:\n" formatted += "-" * 30 + "\n" events = data["exchange"]["events"] if not events: formatted += "No calendar events found matching the query.\n" else: for i, event in enumerate(events[:10], 1): formatted += f"\n{i}. Event: {event.get('subject', 'No subject')}\n" formatted += f" Start: {event.get('start', {}).get('dateTime', 'Unknown')}\n" formatted += f" End: {event.get('end', {}).get('dateTime', 'Unknown')}\n" if event.get('location', {}).get('displayName'): formatted += f" Location: {event['location']['displayName']}\n" if event.get('bodyPreview'): formatted += f" Preview: {event['bodyPreview'][:200]}...\n" return formatted # Helper methods to format data for Slack def _format_slack_data(self, data: Dict[str, Any]) -> str: formatted = "\nšŸ’¬ SLACK MESSAGES:\n" formatted += "-" * 30 + "\n" if "messages" in data and isinstance(data["messages"], list): messages = data["messages"] if not messages: formatted += "No messages found matching the query.\n" else: for i, msg in enumerate(messages[:15], 1): formatted += f"\n{i}. User: {msg.get('user', 'Unknown')}\n" formatted += f" Channel: #{msg.get('channel', 'Unknown')}\n" formatted += f" Message: {msg.get('text', '')}\n" if msg.get('ts'): # Convert timestamp to readable format if needed formatted += f" Time: {msg['ts']}\n" # Slack channels with messages if "channels" in data and isinstance(data["channels"], list): formatted += "\nšŸ“¢ SLACK CHANNELS:\n" formatted += "-" * 30 + "\n" for channel in data["channels"][:10]: formatted += f"\nChannel: #{channel.get('name', 'Unknown')}\n" if channel.get('messages'): for msg in channel['messages'][:5]: formatted += f" • {msg.get('user', 'Unknown')}: {msg.get('text', '')}\n" # Slack files if "files" in data and isinstance(data["files"], list): formatted += "\nšŸ“Ž SLACK FILES:\n" formatted += "-" * 30 + "\n" files = data["files"] if not files: formatted += "No files found matching the query.\n" else: for i, file in enumerate(files[:10], 1): formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" formatted += f" Type: {file.get('mimetype', 'Unknown')}\n" formatted += f" Size: {self._format_file_size(file.get('size', 0))}\n" if file.get('preview'): formatted += f" Preview: {file['preview'][:200]}...\n" return formatted # Helper method to format file sizes def _format_file_size(self, size_bytes: int) -> str: if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: return f"{size_bytes / 1024:.1f} KB" elif size_bytes < 1024 * 1024 * 1024: return f"{size_bytes / (1024 * 1024):.1f} MB" else: return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" if __name__ == "__main__": # Example usage client = MCPClient() print(asyncio.run(client.fetch_app_data( provider="google", services=["gmail"], query="summarize the information from my last 5 emails", user_id="4c1d92c5-ecec-45d5-b8fd-cb0ce5292403", access_token="ya29.a0AS3H6Nw9WnmYv7goOaxsZiwm6qDdaQq4h6tLwD69VVFPa6s7wwPYtzV3EgPIQHMnW_xRIpbcsDzTNmeOs-8gKhnB0RoW27Kuvv75eWcRed5BcWa08JWH5FFeNoSvzr_lZswEV1PZ4e5R4xNXSrtWmV4vJ-UPmwG48HIZn2lkaCgYKAW8SARcSFQHGX2MiUPjegzd64tClSBXeJNUPvw0175" )))