""" Enhanced Research Agent with Multi-Source Integration """ from typing import Dict, List, Any, Optional, Tuple import re from collections import Counter from .base_tool import BaseTool from .web_search import WebSearchTool from .wikipedia_search import WikipediaSearchTool from .arxiv_search import ArxivSearchTool from .github_search import GitHubSearchTool from .sec_search import SECSearchTool class EnhancedResearchAgent: """Enhanced research agent with multi-source synthesis and smart routing""" def __init__(self): # Initialize all research tools self.tools = { 'web': WebSearchTool(), 'wikipedia': WikipediaSearchTool(), 'arxiv': ArxivSearchTool(), 'github': GitHubSearchTool(), 'sec': SECSearchTool() } # Tool availability status self.tool_status = {name: True for name in self.tools.keys()} def search(self, query: str, research_depth: str = "standard") -> str: """Main search method with intelligent routing""" if research_depth == "deep": return self._deep_multi_source_search(query) else: return self._standard_search(query) def search_wikipedia(self, topic: str) -> str: """Wikipedia search method for backward compatibility""" return self.tools['wikipedia'].search(topic) def _standard_search(self, query: str) -> str: """Standard single-source search with smart routing""" # Determine best tool for the query best_tool = self._route_query_to_tool(query) try: return self.tools[best_tool].search(query) except Exception as e: # Fallback to web search if best_tool != 'web': try: return self.tools['web'].search(query) except Exception as e2: return f"**Research for: {query}**\n\nResearch temporarily unavailable: {str(e2)[:100]}..." else: return f"**Research for: {query}**\n\nResearch temporarily unavailable: {str(e)[:100]}..." def _deep_multi_source_search(self, query: str) -> str: """Deep research using multiple sources with synthesis""" results = {} quality_scores = {} # Determine which sources to use based on query type relevant_tools = self._get_relevant_tools(query) # Collect results from multiple sources for tool_name in relevant_tools: try: result = self.tools[tool_name].search(query) if result and len(result.strip()) > 50: # Ensure meaningful result results[tool_name] = result quality_scores[tool_name] = self.tools[tool_name].score_research_quality(result, tool_name) except Exception as e: print(f"Error with {tool_name}: {e}") continue if not results: return f"**Deep Research for: {query}**\n\nNo sources were able to provide results. Please try a different query." # Synthesize results return self._synthesize_multi_source_results(query, results, quality_scores) def _route_query_to_tool(self, query: str) -> str: """Intelligently route query to the most appropriate tool""" query_lower = query.lower() # Priority routing based on query characteristics for tool_name, tool in self.tools.items(): if tool.should_use_for_query(query): # Return first matching tool based on priority order priority_order = ['arxiv', 'sec', 'github', 'wikipedia', 'web'] if tool_name in priority_order[:3]: # High-priority specialized tools return tool_name # Secondary check for explicit indicators if any(indicator in query_lower for indicator in ['company', 'stock', 'financial', 'revenue']): return 'sec' elif any(indicator in query_lower for indicator in ['research', 'study', 'academic', 'paper']): return 'arxiv' elif any(indicator in query_lower for indicator in ['technology', 'framework', 'programming']): return 'github' elif any(indicator in query_lower for indicator in ['what is', 'definition', 'history']): return 'wikipedia' else: return 'web' # Default fallback def _get_relevant_tools(self, query: str) -> List[str]: """Get list of relevant tools for deep search""" relevant_tools = [] # Always include web search for current information relevant_tools.append('web') # Add specialized tools based on query for tool_name, tool in self.tools.items(): if tool_name != 'web' and tool.should_use_for_query(query): relevant_tools.append(tool_name) # Ensure we don't overwhelm with too many sources if len(relevant_tools) > 4: # Prioritize specialized tools priority_order = ['arxiv', 'sec', 'github', 'wikipedia', 'web'] relevant_tools = [tool for tool in priority_order if tool in relevant_tools][:4] return relevant_tools def _synthesize_multi_source_results(self, query: str, results: Dict[str, str], quality_scores: Dict[str, Dict]) -> str: """Synthesize results from multiple research sources""" synthesis = f"**Comprehensive Research Analysis: {query}**\n\n" # Add source summary synthesis += f"**Research Sources Used:** {', '.join(results.keys()).replace('_', ' ').title()}\n\n" # Find key themes and agreements/disagreements key_findings = self._extract_key_findings(results) synthesis += self._format_key_findings(key_findings) # Add individual source results (condensed) synthesis += "**Detailed Source Results:**\n\n" # Sort sources by quality score sorted_sources = sorted(quality_scores.items(), key=lambda x: x[1]['overall'], reverse=True) for source_name, _ in sorted_sources: if source_name in results: source_result = results[source_name] quality = quality_scores[source_name] # Condense long results if len(source_result) > 800: source_result = source_result[:800] + "...\n[Result truncated for synthesis]" synthesis += f"**{source_name.replace('_', ' ').title()} (Quality: {quality['overall']:.2f}/1.0):**\n" synthesis += f"{source_result}\n\n" # Add research quality assessment synthesis += self._format_research_quality_assessment(quality_scores) return synthesis def _extract_key_findings(self, results: Dict[str, str]) -> Dict[str, List[str]]: """Extract key findings and themes from multiple sources""" findings = { 'agreements': [], 'contradictions': [], 'unique_insights': [], 'data_points': [] } # Extract key sentences from each source all_sentences = [] source_sentences = {} for source, result in results.items(): sentences = self._extract_key_sentences(result) source_sentences[source] = sentences all_sentences.extend(sentences) # Find common themes (simplified approach) word_counts = Counter() for sentence in all_sentences: words = re.findall(r'\b\w{4,}\b', sentence.lower()) # Words 4+ chars word_counts.update(words) common_themes = [word for word, count in word_counts.most_common(10) if count > 1] # Look for numerical data numbers = re.findall(r'\b\d+(?:\.\d+)?%?\b', ' '.join(all_sentences)) findings['data_points'] = list(set(numbers))[:10] # Top 10 unique numbers # Simplified agreement detection if len(source_sentences) > 1: findings['agreements'] = [f"Multiple sources mention: {theme}" for theme in common_themes[:3]] return findings def _extract_key_sentences(self, text: str) -> List[str]: """Extract key sentences from research text""" if not text: return [] # Split into sentences sentences = re.split(r'[.!?]+', text) # Filter for key sentences (containing important indicators) key_indicators = [ 'research shows', 'study found', 'according to', 'data indicates', 'results suggest', 'analysis reveals', 'evidence shows', 'reported that', 'concluded that', 'demonstrated that', 'increased', 'decreased', 'growth', 'decline', 'significant', 'important', 'critical' ] key_sentences = [] for sentence in sentences: sentence = sentence.strip() if (len(sentence) > 30 and any(indicator in sentence.lower() for indicator in key_indicators)): key_sentences.append(sentence) return key_sentences[:5] # Top 5 key sentences def _format_key_findings(self, findings: Dict[str, List[str]]) -> str: """Format key findings summary""" result = "**Key Research Synthesis:**\n\n" if findings['agreements']: result += "**Common Themes:**\n" for agreement in findings['agreements']: result += f"• {agreement}\n" result += "\n" if findings['data_points']: result += "**Key Data Points:**\n" for data in findings['data_points'][:5]: result += f"• {data}\n" result += "\n" if findings['unique_insights']: result += "**Unique Insights:**\n" for insight in findings['unique_insights']: result += f"• {insight}\n" result += "\n" return result def _format_research_quality_assessment(self, quality_scores: Dict[str, Dict]) -> str: """Format overall research quality assessment""" if not quality_scores: return "" result = "**Research Quality Assessment:**\n\n" # Calculate average quality metrics avg_overall = sum(scores['overall'] for scores in quality_scores.values()) / len(quality_scores) avg_authority = sum(scores['authority'] for scores in quality_scores.values()) / len(quality_scores) avg_recency = sum(scores['recency'] for scores in quality_scores.values()) / len(quality_scores) avg_specificity = sum(scores['specificity'] for scores in quality_scores.values()) / len(quality_scores) result += f"• Overall Research Quality: {avg_overall:.2f}/1.0\n" result += f"• Source Authority: {avg_authority:.2f}/1.0\n" result += f"• Information Recency: {avg_recency:.2f}/1.0\n" result += f"• Data Specificity: {avg_specificity:.2f}/1.0\n" result += f"• Sources Consulted: {len(quality_scores)}\n\n" # Quality interpretation if avg_overall >= 0.8: quality_level = "Excellent" elif avg_overall >= 0.6: quality_level = "Good" elif avg_overall >= 0.4: quality_level = "Moderate" else: quality_level = "Limited" result += f"**Research Reliability: {quality_level}**\n" if avg_authority >= 0.8: result += "• High-authority sources with strong credibility\n" if avg_recency >= 0.7: result += "• Current and up-to-date information\n" if avg_specificity >= 0.6: result += "• Specific data points and quantitative evidence\n" return result def generate_research_queries(self, question: str, current_discussion: List[Dict]) -> List[str]: """Auto-generate targeted research queries based on discussion gaps""" # Analyze discussion for gaps discussion_text = "\n".join([msg.get('text', '') for msg in current_discussion]) # Extract claims that need verification unsubstantiated_claims = self._find_unsubstantiated_claims(discussion_text) # Generate specific queries queries = [] # Add queries for unsubstantiated claims for claim in unsubstantiated_claims[:3]: query = self._convert_claim_to_query(claim) if query: queries.append(query) # Add queries for missing quantitative data if not re.search(r'\d+%', discussion_text): queries.append(f"{question} statistics data percentages") # Add current trends query queries.append(f"{question} 2024 2025 recent developments") return queries[:3] # Limit to 3 targeted queries def _find_unsubstantiated_claims(self, discussion_text: str) -> List[str]: """Find claims that might need research backing""" claims = [] # Look for assertion patterns assertion_patterns = [ r'(?:should|must|will|is|are)\s+[^.]{20,100}', r'(?:studies show|research indicates|data suggests)\s+[^.]{20,100}', r'(?:according to|based on)\s+[^.]{20,100}' ] for pattern in assertion_patterns: matches = re.findall(pattern, discussion_text, re.IGNORECASE) claims.extend(matches[:2]) # Limit matches per pattern return claims def _convert_claim_to_query(self, claim: str) -> Optional[str]: """Convert a claim into a research query""" if not claim or len(claim) < 10: return None # Extract key terms key_terms = re.findall(r'\b\w{4,}\b', claim.lower()) if len(key_terms) < 2: return None # Create query from key terms query_terms = key_terms[:4] # Use first 4 meaningful terms return " ".join(query_terms) def prioritize_research_needs(self, expert_positions: List[Dict], question: str) -> List[str]: """Identify and prioritize research that could resolve expert conflicts""" # Extract expert claims expert_claims = {} for position in expert_positions: speaker = position.get('speaker', 'Unknown') text = position.get('text', '') expert_claims[speaker] = self._extract_key_claims(text) # Find disagreements disagreements = self._find_expert_disagreements(expert_claims) # Generate research priorities priorities = [] for disagreement in disagreements[:3]: # Create research query to resolve disagreement query = f"{question} {disagreement['topic']} evidence data" priorities.append(query) return priorities def _extract_key_claims(self, expert_text: str) -> List[str]: """Extract key factual claims from expert response""" if not expert_text: return [] sentences = expert_text.split('.') claims = [] for sentence in sentences: sentence = sentence.strip() if (len(sentence) > 20 and any(indicator in sentence.lower() for indicator in [ 'should', 'will', 'is', 'are', 'must', 'can', 'would', 'could' ])): claims.append(sentence) return claims[:3] # Top 3 claims def _find_expert_disagreements(self, expert_claims: Dict[str, List[str]]) -> List[Dict]: """Identify areas where experts disagree""" disagreements = [] experts = list(expert_claims.keys()) for i, expert1 in enumerate(experts): for expert2 in experts[i+1:]: claims1 = expert_claims[expert1] claims2 = expert_claims[expert2] conflicts = self._find_conflicting_claims(claims1, claims2) if conflicts: disagreements.append({ 'experts': [expert1, expert2], 'topic': self._extract_conflict_topic(conflicts[0]), 'conflicts': conflicts[:1] # Just the main conflict }) return disagreements def _find_conflicting_claims(self, claims1: List[str], claims2: List[str]) -> List[str]: """Identify potentially conflicting claims (simplified)""" conflicts = [] # Simple opposing sentiment detection opposing_pairs = [ ('should', 'should not'), ('will', 'will not'), ('is', 'is not'), ('increase', 'decrease'), ('better', 'worse'), ('yes', 'no'), ('support', 'oppose'), ('benefit', 'harm'), ('effective', 'ineffective') ] for claim1 in claims1: for claim2 in claims2: for pos, neg in opposing_pairs: if pos in claim1.lower() and neg in claim2.lower(): conflicts.append(f"{claim1} vs {claim2}") elif neg in claim1.lower() and pos in claim2.lower(): conflicts.append(f"{claim1} vs {claim2}") return conflicts def _extract_conflict_topic(self, conflict: str) -> str: """Extract the main topic from a conflict description""" # Simple extraction of key terms words = re.findall(r'\b\w{4,}\b', conflict.lower()) # Filter out common words stopwords = {'should', 'will', 'would', 'could', 'this', 'that', 'with', 'from', 'they', 'them'} topic_words = [word for word in words if word not in stopwords] return " ".join(topic_words[:3]) def suggest_research_follow_ups(self, discussion_log: List[Dict], question: str) -> List[str]: """Suggest additional research questions based on discussion patterns""" # Get recent discussion latest_messages = discussion_log[-6:] if len(discussion_log) > 6 else discussion_log recent_text = "\n".join([msg.get('content', '') for msg in latest_messages]) follow_ups = [] # Look for unverified statistics if re.search(r'\d+%', recent_text): follow_ups.append(f"{question} statistics verification current data") # Look for trend mentions trend_keywords = ['trend', 'growing', 'increasing', 'declining', 'emerging'] if any(keyword in recent_text.lower() for keyword in trend_keywords): follow_ups.append(f"{question} current trends 2024 2025") # Look for example mentions if 'example' in recent_text.lower() or 'case study' in recent_text.lower(): follow_ups.append(f"{question} case studies examples evidence") return follow_ups[:3] def get_tool_status(self) -> Dict[str, bool]: """Get status of all research tools""" return { name: self.tool_status.get(name, True) for name in self.tools.keys() } def test_tool_connections(self) -> Dict[str, str]: """Test all research tool connections""" results = {} for name, tool in self.tools.items(): try: # Simple test query test_result = tool.search("test", max_results=1) if test_result and len(test_result) > 20: results[name] = "✅ Working" self.tool_status[name] = True else: results[name] = "⚠️ Limited response" self.tool_status[name] = False except Exception as e: results[name] = f"❌ Error: {str(e)[:50]}..." self.tool_status[name] = False return results