jzou19950715's picture
Update app.py
c734c14 verified
raw
history blame
23.3 kB
import json
import logging
import os
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from pathlib import Path
# Third-party imports
import gradio as gr
from openai import OpenAI
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log')
]
)
logger = logging.getLogger(__name__)
# System prompt for the AI assistant
SYSTEM_PROMPT = """
You are an Information Extraction Assistant, designed to help extract and organize
important information from conversations in a natural and engaging way.
Core Capabilities:
- Natural conversation while gathering specific information
- Flexible information extraction based on context
- Progress tracking and completion estimation
- Structured data organization with context preservation
Please maintain a friendly and professional tone while ensuring accurate information extraction.
"""
@dataclass
class ExtractedInfo:
"""Structure for storing extracted information."""
text: str
category: str
confidence: float
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ConversationState:
"""Tracks the state and progress of the conversation."""
extracted_items: List[ExtractedInfo] = field(default_factory=list)
categories_covered: List[str] = field(default_factory=list)
current_focus: Optional[str] = None
completion_percentage: float = 0.0
last_error: Optional[str] = None
last_update: datetime = field(default_factory=datetime.now)
def add_extracted_info(self, info: ExtractedInfo) -> None:
"""Add new extracted information and update state."""
self.extracted_items.append(info)
if info.category not in self.categories_covered:
self.categories_covered.append(info.category)
self.last_update = datetime.now()
class InformationExtractor:
"""Core class for handling information extraction from conversations."""
def __init__(self):
self.conversation_history: List[Dict[str, str]] = []
self.state = ConversationState()
self.client: Optional[OpenAI] = None
self.extraction_categories = [
"personal_info",
"education",
"work_experience",
"skills",
"achievements"
]
def _validate_api_key(self, api_key: str) -> bool:
"""Validate OpenAI API key format."""
if not api_key.strip():
raise ValueError("API key cannot be empty")
if not api_key.startswith('sk-'):
raise ValueError("Invalid API key format")
return True
def _initialize_client(self, api_key: str) -> None:
"""Initialize OpenAI client with error handling."""
try:
if self._validate_api_key(api_key):
self.client = OpenAI(api_key=api_key)
except Exception as e:
logger.error(f"Error initializing OpenAI client: {str(e)}")
raise
def _add_to_history(self, role: str, content: str) -> None:
"""Add a message to conversation history with timestamp."""
self.conversation_history.append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
def _get_ai_response(self, retries: int = 3) -> str:
"""Get response from OpenAI with retry mechanism."""
if not self.client:
raise ValueError("OpenAI client not initialized")
for attempt in range(retries):
try:
response = self.client.chat.completions.create(
model="gpt-4o-mini", # Changed from "gpt-4" to "gpt-4o-mini"
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
*[{
"role": msg["role"],
"content": msg["content"]
} for msg in self.conversation_history]
],
temperature=0.7,
max_tokens=2000
)
return response.choices[0].message.content
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt == retries - 1:
raise Exception(f"Failed after {retries} attempts: {str(e)}")
continue
def _extract_information(self, text: str) -> List[ExtractedInfo]:
"""Extract structured information from text."""
try:
extraction_prompt = f"""
Analyze the following text and extract relevant information.
Categories to consider: {', '.join(self.extraction_categories)}
For each piece of information extracted, provide:
1. The exact text
2. The category it belongs to
3. Confidence level (0.0 to 1.0)
4. Any relevant context or metadata
Format as JSON:
{{
"extracted_items": [
{{
"text": "extracted text",
"category": "category name",
"confidence": 0.95,
"metadata": {{}}
}}
]
}}
Text to analyze: {text}
"""
response = self.client.chat.completions.create(
model="gpt-4o-mini", # Changed from "gpt-4" to "gpt-4o-mini"
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": extraction_prompt}
],
temperature=0.3
)
# Parse response and create ExtractedInfo objects
analysis = json.loads(response.choices[0].message.content)
extracted_items = []
for item in analysis.get("extracted_items", []):
extracted_info = ExtractedInfo(
text=item["text"],
category=item["category"],
confidence=item["confidence"],
metadata=item.get("metadata", {})
)
extracted_items.append(extracted_info)
return extracted_items
except json.JSONDecodeError as e:
logger.error(f"Error parsing extraction response: {str(e)}")
return []
except Exception as e:
logger.error(f"Error during information extraction: {str(e)}")
return []
def _update_completion_status(self) -> None:
"""Update completion status based on extracted information."""
total_categories = len(self.extraction_categories)
covered_categories = len(self.state.categories_covered)
# Calculate base completion percentage
base_completion = (covered_categories / total_categories) * 100
# Adjust based on confidence levels
if self.state.extracted_items:
avg_confidence = sum(item.confidence for item in self.state.extracted_items) / len(self.state.extracted_items)
adjusted_completion = base_completion * avg_confidence
else:
adjusted_completion = 0.0
self.state.completion_percentage = min(adjusted_completion, 100.0)
def process_message(self, message: str, api_key: str) -> Dict[str, Any]:
"""Process a user message and extract information."""
try:
# Initialize client if needed
if not self.client:
self._initialize_client(api_key)
# Add user message to history
self._add_to_history("user", message)
# Get AI response
ai_response = self._get_ai_response()
self._add_to_history("assistant", ai_response)
# Extract information from the entire conversation
new_information = self._extract_information(message + "\n" + ai_response)
# Update state with new information
for info in new_information:
self.state.add_extracted_info(info)
# Update completion status
self._update_completion_status()
return {
"response": ai_response,
"extracted_info": [
{
"text": info.text,
"category": info.category,
"confidence": info.confidence
} for info in new_information
],
"completion_status": {
"percentage": self.state.completion_percentage,
"categories_covered": self.state.categories_covered,
"current_focus": self.state.current_focus
}
}
except Exception as e:
error_msg = f"Error processing message: {str(e)}"
logger.error(error_msg)
self.state.last_error = error_msg
return {
"error": error_msg,
"completion_status": {
"percentage": self.state.completion_percentage,
"categories_covered": self.state.categories_covered,
"current_focus": self.state.current_focus
}
}
def generate_output(self) -> Dict[str, Any]:
"""Generate structured output from all extracted information."""
try:
# Organize extracted information by category
categorized_info = {}
for category in self.extraction_categories:
category_items = [
{
"text": item.text,
"confidence": item.confidence,
"timestamp": item.timestamp.isoformat(),
"metadata": item.metadata
}
for item in self.state.extracted_items
if item.category == category
]
if category_items:
categorized_info[category] = category_items
# Create output structure
output = {
"extracted_information": categorized_info,
"analysis_summary": {
"total_items": len(self.state.extracted_items),
"categories_covered": self.state.categories_covered,
"completion_percentage": self.state.completion_percentage
},
"metadata": {
"generated_at": datetime.now().isoformat(),
"conversation_length": len(self.conversation_history),
"version": "2.0"
}
}
# Save to file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"extracted_info_{timestamp}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(output, f, indent=2, ensure_ascii=False)
return {
"filename": filename,
"content": output,
"status": "success"
}
except Exception as e:
error_msg = f"Error generating output: {str(e)}"
logger.error(error_msg)
return {
"error": error_msg,
"status": "error"
}
def create_gradio_interface():
"""Create the Gradio interface for information extraction."""
extractor = InformationExtractor()
# Custom CSS for better styling
css = """
.container { max-width: 900px; margin: auto; }
.message { padding: 1rem; margin: 0.5rem 0; border-radius: 0.5rem; }
.info-panel { background: #f5f5f5; padding: 1rem; border-radius: 0.5rem; }
.status-badge {
display: inline-block;
padding: 0.25rem 0.5rem;
border-radius: 0.25rem;
margin: 0.25rem;
background: #e0e0e0;
}
.extraction-highlight {
background: #e8f4f8;
border-left: 4px solid #4a90e2;
padding: 0.5rem;
margin: 0.5rem 0;
}
"""
with gr.Blocks(css=css, theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🔍 Information Extraction Assistant
Have a natural conversation while we extract and organize important information.
The system will automatically identify and categorize relevant details.
""")
with gr.Row():
with gr.Column(scale=2):
# API Key input
api_key = gr.Textbox(
label="OpenAI API Key",
type="password",
placeholder="Enter your OpenAI API key (sk-...)",
show_label=True
)
# Chat interface
chatbot = gr.Chatbot(
value=[],
height=400,
type="messages",
show_label=False
)
# Message input
with gr.Row():
msg = gr.Textbox(
label="Message",
placeholder="Type your message here...",
scale=4
)
submit = gr.Button(
"Send",
variant="primary",
scale=1
)
# Action buttons
with gr.Row():
clear = gr.Button("Clear Chat", scale=1)
generate = gr.Button(
"Generate Report",
variant="secondary",
scale=2
)
with gr.Column(scale=1):
# Extraction Status Panel
with gr.Group(visible=True) as status_panel:
gr.Markdown("### Extraction Progress")
# Progress indicator
progress = gr.Slider(
label="Completion",
minimum=0,
maximum=100,
value=0,
interactive=False
)
# Categories covered
categories_covered = gr.JSON(
label="Categories Covered",
value={"categories": []}
)
# Current focus
current_focus = gr.Textbox(
label="Current Focus",
value="Not started",
interactive=False
)
# Extraction Results
with gr.Tabs() as result_tabs:
with gr.Tab("Extracted Information"):
extracted_info = gr.JSON(
label="Extracted Details",
value={}
)
with gr.Tab("Download"):
file_output = gr.File(
label="Download Report"
)
with gr.Tab("Analysis"):
analysis_text = gr.Markdown(
"Analysis will appear here after processing."
)
# Helper Functions
def format_extraction_summary(extracted_items: List[Dict]) -> str:
"""Format extracted information for display."""
if not extracted_items:
return "No information extracted yet."
summary = ["### Recently Extracted Information"]
for item in extracted_items:
summary.append(
f"- **{item['category']}** ({item['confidence']*100:.1f}% confidence)\n"
f" {item['text']}"
)
return "\n".join(summary)
def update_interface_state(state: Dict[str, Any]) -> tuple:
"""Update all interface components based on current state."""
return (
state['completion_status']['percentage'],
{"categories": state['completion_status']['categories_covered']},
state['completion_status']['current_focus']
)
# Event Handlers
def process_message(message: str, history: list, key: str) -> tuple:
"""Handle message processing and update interface."""
if not message.strip():
return history, 0, {}, "Please enter a message"
try:
# Process message
result = extractor.process_message(message, key)
if "error" in result:
return (
history,
0,
{"categories": []},
f"Error: {result['error']}"
)
# Update chat history
history.append({
"role": "user",
"content": message
})
history.append({
"role": "assistant",
"content": result["response"]
})
# Update status components
progress_value = result["completion_status"]["percentage"]
categories = {
"categories": result["completion_status"]["categories_covered"]
}
current_focus = result["completion_status"]["current_focus"] or "Processing..."
# Update extraction display
if result.get("extracted_info"):
analysis_text = format_extraction_summary(result["extracted_info"])
else:
analysis_text = "No new information extracted."
return (
history,
progress_value,
categories,
current_focus,
analysis_text
)
except Exception as e:
logger.error(f"Error in process_message: {str(e)}")
return (
history,
0,
{"categories": []},
f"Error: {str(e)}",
"An error occurred during processing."
)
def generate_report() -> tuple:
"""Generate and return report file."""
try:
result = extractor.generate_output()
if result["status"] == "success":
# Update JSON preview
content_preview = {
"summary": result["content"]["analysis_summary"],
"categories": list(result["content"]["extracted_information"].keys()),
"total_items": len(result["content"]["extracted_information"])
}
return (
result["filename"],
content_preview,
"Report generated successfully! 🎉",
gr.update(value=format_extraction_summary(
[item for items in result["content"]["extracted_information"].values()
for item in items]
))
)
else:
return (
None,
{"error": result["error"]},
f"Error generating report: {result['error']}",
"Failed to generate analysis."
)
except Exception as e:
logger.error(f"Error in generate_report: {str(e)}")
return (
None,
{"error": str(e)},
f"Error: {str(e)}",
"An error occurred during report generation."
)
def clear_interface() -> tuple:
"""Reset all interface components."""
# Reset extractor state
global extractor
extractor = InformationExtractor()
return (
[], # Clear chat history
0.0, # Reset progress
{"categories": []}, # Clear categories
"Not started", # Reset focus
{}, # Clear extracted info
None, # Clear file output
"Ready to start new extraction.", # Reset analysis
gr.update(value="") # Clear message input
)
# Event Bindings
msg.submit(
process_message,
inputs=[msg, chatbot, api_key],
outputs=[
chatbot,
progress,
categories_covered,
current_focus,
analysis_text
]
).then(
lambda: "",
None,
msg
)
submit.click(
process_message,
inputs=[msg, chatbot, api_key],
outputs=[
chatbot,
progress,
categories_covered,
current_focus,
analysis_text
]
).then(
lambda: "",
None,
msg
)
generate.click(
generate_report,
outputs=[
file_output,
extracted_info,
current_focus,
analysis_text
]
)
clear.click(
clear_interface,
outputs=[
chatbot,
progress,
categories_covered,
current_focus,
extracted_info,
file_output,
analysis_text,
msg
]
)
return demo
if __name__ == "__main__":
# Set up logging for the main application
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
demo = create_gradio_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_api=False
)
except Exception as e:
logger.error(f"Application failed to start: {str(e)}")
raise