from typing import Dict, List, Any import networkx as nx from rdflib import Graph, Literal, RDF, URIRef from loguru import logger from utils.llm_orchestrator import LLMOrchestrator import json from datetime import datetime class KnowledgeManagementLayer: def __init__(self, llm_api_key: str): """Initialize the Knowledge Management Layer.""" self.llm_orchestrator = LLMOrchestrator(llm_api_key) self.knowledge_graph = nx.DiGraph() self.rdf_graph = Graph() self.setup_logger() def setup_logger(self): """Configure logging for the knowledge management layer.""" logger.add("logs/knowledge_management.log", rotation="500 MB") async def update_knowledge_graph( self, new_info: Dict[str, Any]) -> Dict[str, str]: """Update the knowledge graph with new information.""" logger.info("Updating knowledge graph with new information") entities_added = 0 relations_added = 0 try: # Process new information using LLM processed_info = await self.process_information(new_info) # Add nodes and edges to the graph for entity in processed_info['entities']: self.knowledge_graph.add_node( entity['id'], **entity['attributes'] ) entities_added += 1 for relation in processed_info['relations']: self.knowledge_graph.add_edge( relation['source'], relation['target'], **relation['attributes'] ) relations_added += 1 # Update RDF graph await self.update_rdf_graph(processed_info) logger.info( f"Successfully updated knowledge graph: Added {entities_added} entities and {relations_added} relations") return { 'status': 'success', 'message': f"Added {entities_added} entities and {relations_added} relations" } except Exception as e: logger.error(f"Error updating knowledge graph: {str(e)}") logger.error( f"Processed {entities_added} entities and {relations_added} relations before error") return { 'status': 'error', 'message': str(e) } async def process_information( self, info: Dict[str, Any]) -> Dict[str, Any]: """Process raw information using LLM to extract entities and relations.""" logger.info("Processing information to extract entities and relations") try: # Generate prompt for entity extraction entity_prompt = f""" Extract entities and their attributes from the following information: {json.dumps(info, indent=2)} Return the entities in the following format: - Entity ID - Entity Type - Attributes (key-value pairs) """ entity_response = await self.llm_orchestrator.generate_completion(entity_prompt) entities = self.parse_llm_response(entity_response, 'entities') logger.info(f"Extracted {len(entities)} entities") # Generate prompt for relation extraction relation_prompt = f""" Extract relations between entities from the following information: {json.dumps(info, indent=2)} Entities found: {json.dumps(entities, indent=2)} Return the relations in the following format: - Source Entity ID - Target Entity ID - Relation Type - Attributes (key-value pairs) """ relation_response = await self.llm_orchestrator.generate_completion(relation_prompt) relations = self.parse_llm_response(relation_response, 'relations') logger.info(f"Extracted {len(relations)} relations") return { 'entities': entities, 'relations': relations } except Exception as e: logger.error(f"Error processing information: {str(e)}") raise async def update_rdf_graph(self, processed_info: Dict[str, Any]): """Update the RDF graph with processed information.""" try: for entity in processed_info['entities']: subject = URIRef(f"entity:{entity['id']}") self.rdf_graph.add( (subject, RDF.type, URIRef(f"type:{entity['type']}"))) for key, value in entity['attributes'].items(): self.rdf_graph.add( (subject, URIRef(f"attribute:{key}"), Literal(value))) for relation in processed_info['relations']: subject = URIRef(f"entity:{relation['source']}") obj = URIRef(f"entity:{relation['target']}") predicate = URIRef(f"relation:{relation['type']}") self.rdf_graph.add((subject, predicate, obj)) for key, value in relation['attributes'].items(): self.rdf_graph.add( (predicate, URIRef(f"attribute:{key}"), Literal(value))) except Exception as e: logger.error(f"Error updating RDF graph: {str(e)}") raise async def query_knowledge(self, query: Dict[str, Any]) -> Dict[str, Any]: """Query the knowledge graph based on specific criteria.""" try: # Generate SPARQL query using LLM sparql_prompt = f""" Generate a SPARQL query for the following search criteria: {json.dumps(query, indent=2)} """ sparql_query = await self.llm_orchestrator.generate_completion(sparql_prompt) # Execute query on RDF graph results = self.rdf_graph.query(sparql_query) # Process and format results formatted_results = await self.format_query_results(results) return { 'status': 'success', 'results': formatted_results } except Exception as e: logger.error(f"Error querying knowledge graph: {str(e)}") return { 'status': 'error', 'message': str(e) } async def generate_insights( self, context: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate insights from the knowledge graph.""" try: # Extract relevant subgraph based on context subgraph = self.extract_relevant_subgraph(context) # Generate insights using LLM insight_prompt = f""" Generate insights from the following knowledge graph data: Nodes: {len(subgraph.nodes)} Edges: {len(subgraph.edges)} Context: {json.dumps(context, indent=2)} Graph Summary: {self.summarize_subgraph(subgraph)} """ insights = await self.llm_orchestrator.generate_completion(insight_prompt) return self.parse_llm_response(insights, 'insights') except Exception as e: logger.error(f"Error generating insights: {str(e)}") raise def extract_relevant_subgraph(self, context: Dict[str, Any]) -> nx.DiGraph: """Extract a relevant subgraph based on context.""" # Implementation would include logic to extract relevant portions of the graph # based on the provided context return self.knowledge_graph def summarize_subgraph(self, subgraph: nx.DiGraph) -> str: """Generate a summary of the subgraph.""" summary = { 'node_types': {}, 'edge_types': {}, 'key_entities': [] } # Count node types for node in subgraph.nodes(data=True): node_type = node[1].get('type', 'unknown') summary['node_types'][node_type] = summary['node_types'].get( node_type, 0) + 1 # Count edge types for edge in subgraph.edges(data=True): edge_type = edge[2].get('type', 'unknown') summary['edge_types'][edge_type] = summary['edge_types'].get( edge_type, 0) + 1 # Identify key entities (e.g., nodes with highest degree) for node in sorted(subgraph.degree, key=lambda x: x[1], reverse=True)[ :5]: summary['key_entities'].append({ 'id': node[0], 'degree': node[1] }) return json.dumps(summary, indent=2) @staticmethod def parse_llm_response( response: str, response_type: str) -> List[Dict[str, Any]]: """Parse LLM response into structured data.""" # Implementation would include logic to parse the LLM's response # into a structured format based on the response_type return [] # Placeholder return async def backup_knowledge(self, backup_path: str): """Backup the knowledge graph to a file.""" try: timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S') # Backup NetworkX graph nx.write_gpickle( self.knowledge_graph, f"{backup_path}/knowledge_graph_{timestamp}.gpickle") # Backup RDF graph self.rdf_graph.serialize( f"{backup_path}/rdf_graph_{timestamp}.ttl", format="turtle") logger.info( f"Knowledge graph backed up successfully at {timestamp}") except Exception as e: logger.error(f"Error backing up knowledge graph: {str(e)}") raise