import gradio as gr import torch from transformers import AutoModelForCausalLM, AutoTokenizer import json import os from datetime import datetime import re import threading import time # Custom CSS for beautiful design custom_css = """ .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } .header-container { background: rgba(255, 255, 255, 0.95); border-radius: 20px; padding: 2rem; margin: 1rem; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1); text-align: center; } .chat-container { background: rgba(255, 255, 255, 0.95); border-radius: 15px; padding: 1rem; margin: 0.5rem; box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); } .custom-button { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border: none; border-radius: 25px; padding: 12px 24px; color: white; font-weight: bold; transition: all 0.3s ease; } .memory-box { background: linear-gradient(135deg, #81ecec 0%, #74b9ff 100%); border-radius: 10px; padding: 1rem; margin: 1rem 0; color: white; } .model-selector { background: rgba(255, 255, 255, 0.9); border-radius: 10px; padding: 1rem; margin: 0.5rem; } """ class DualModelMemoryBot: def __init__(self): self.device = "cuda" if torch.cuda.is_available() else "cpu" self.models = { "Qwen/Qwen2-0.5B-Instruct": { "model": None, "tokenizer": None, "loaded": False, "description": "Qwen - Better for complex conversations" }, "HuggingFaceTB/SmolLM2-360M-Instruct": { "model": None, "tokenizer": None, "loaded": False, "description": "SmolLM - Faster and lightweight" } } self.current_model = "Qwen/Qwen2-0.5B-Instruct" self.memory_file = "dual_memory.json" self.memory = self.load_memory() self.loading_status = "Ready" # Load default model self.load_model(self.current_model) def load_memory(self): """Load unified memory from JSON""" if os.path.exists(self.memory_file): try: with open(self.memory_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error loading memory: {e}") return {} def save_memory(self): """Save unified memory to JSON""" try: with open(self.memory_file, 'w', encoding='utf-8') as f: json.dump(self.memory, f, indent=2, default=str, ensure_ascii=False) except Exception as e: print(f"Error saving memory: {e}") def load_model(self, model_name): """Load the specified AI model""" if model_name not in self.models: return f"āŒ Unknown model: {model_name}" if self.models[model_name]["loaded"]: self.current_model = model_name return f"āœ… {model_name} already loaded and selected!" try: self.loading_status = f"šŸ”„ Loading {model_name}..." print(f"šŸ”„ Loading {model_name}...") # Load tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Load model with appropriate settings model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, low_cpu_mem_usage=True, trust_remote_code=True, device_map="auto" if self.device == "cuda" else None ) if self.device == "cpu": model = model.to(self.device) self.models[model_name]["model"] = model self.models[model_name]["tokenizer"] = tokenizer self.models[model_name]["loaded"] = True self.current_model = model_name self.loading_status = "Ready" print(f"āœ… {model_name} loaded successfully!") return f"āœ… {model_name} loaded and selected successfully!" except Exception as e: self.loading_status = "Error" error_msg = f"āŒ Error loading {model_name}: {str(e)}" print(error_msg) return error_msg def switch_model(self, model_name): """Switch to a different model""" if model_name not in self.models: return f"āŒ Unknown model: {model_name}" if not self.models[model_name]["loaded"]: return self.load_model(model_name) else: self.current_model = model_name return f"āœ… Switched to {model_name}!" def get_model_status(self): """Get status of all models""" status = "šŸ¤– **Model Status:**\n\n" for model_name, info in self.models.items(): status_icon = "āœ…" if info["loaded"] else "ā³" current_icon = "šŸ‘ˆ" if model_name == self.current_model else "" status += f"{status_icon} **{model_name.split('/')[-1]}** {current_icon}\n" status += f" {info['description']}\n\n" status += f"**Current:** {self.current_model.split('/')[-1]}\n" status += f"**Device:** {self.device}\n" status += f"**Status:** {self.loading_status}" return status def process_natural_language(self, text, user_id): """Enhanced natural language processing with memory""" text_lower = text.lower().strip() # Initialize user memory if not exists if user_id not in self.memory: self.memory[user_id] = { "name": None, "age": None, "medications": [], "allergies": [], "conditions": [], "conversations": [], "notes": [] } user_memory = self.memory[user_id] response = "" # Extract and store name name_patterns = [ r"my name is (\w+)", r"i am (\w+)", r"call me (\w+)", r"i'm (\w+)", r"name's (\w+)" ] for pattern in name_patterns: match = re.search(pattern, text_lower) if match: name = match.group(1).capitalize() user_memory["name"] = name response = f"Nice to meet you, {name}! I'll remember your name. 😊" break # Extract age age_patterns = [ r"i am (\d+) years old", r"i'm (\d+) years old", r"my age is (\d+)", r"i am (\d+)" ] for pattern in age_patterns: match = re.search(pattern, text_lower) if match: age = match.group(1) user_memory["age"] = age response = f"Got it! I'll remember that you're {age} years old." break # Handle name queries if any(phrase in text_lower for phrase in ["what's my name", "whats my name", "my name", "do you know my name"]): if user_memory["name"]: response = f"Your name is {user_memory['name']}! 😊" else: response = "I don't know your name yet. Please tell me your name." # Handle age queries if any(phrase in text_lower for phrase in ["how old am i", "my age", "what's my age"]): if user_memory["age"]: response = f"You're {user_memory['age']} years old!" else: response = "I don't know your age yet. Please tell me how old you are." # Enhanced medication patterns med_add_patterns = [ r"i take (\w+(?:\s+\w+)*)\s+(\d+(?:\.\d+)?)\s*(\w+)\s*(daily|twice daily|once daily|every \d+ hours?|morning|evening|night|before meals|after meals)", r"i'm taking (\w+(?:\s+\w+)*)\s+(\d+(?:\.\d+)?)\s*(\w+)\s*(daily|twice daily|once daily|every \d+ hours?|morning|evening|night|before meals|after meals)", r"prescribed (\w+(?:\s+\w+)*)\s+(\d+(?:\.\d+)?)\s*(\w+)\s*(daily|twice daily|once daily|every \d+ hours?|morning|evening|night|before meals|after meals)", r"doctor gave me (\w+(?:\s+\w+)*)\s+(\d+(?:\.\d+)?)\s*(\w+)\s*(daily|twice daily|once daily|every \d+ hours?|morning|evening|night|before meals|after meals)", r"started (\w+(?:\s+\w+)*)\s+(\d+(?:\.\d+)?)\s*(\w+)\s*(daily|twice daily|once daily|every \d+ hours?|morning|evening|night|before meals|after meals)", r"add medication (\w+(?:\s+\w+)*)\s+(\d+(?:\.\d+)?)\s*(\w+)\s*(daily|twice daily|once daily|every \d+ hours?|morning|evening|night|before meals|after meals)" ] for pattern in med_add_patterns: match = re.search(pattern, text_lower) if match: med_name = match.group(1).title() dosage = f"{match.group(2)}{match.group(3)}" frequency = match.group(4) # Check if medication already exists existing = False for med in user_memory["medications"]: if med["name"].lower() == med_name.lower(): med["dosage"] = dosage med["frequency"] = frequency med["last_updated"] = datetime.now().isoformat() existing = True response = f"āœ… Updated {med_name} to {dosage} {frequency}" break if not existing: user_memory["medications"].append({ "name": med_name, "dosage": dosage, "frequency": frequency, "added_date": datetime.now().isoformat(), "last_taken": None, "notes": "" }) response = f"āœ… Added {med_name} ({dosage}) {frequency} to your medications" break # Record taking medication took_patterns = [ r"i took (\w+(?:\s+\w+)*)", r"just took (\w+(?:\s+\w+)*)", r"had my (\w+(?:\s+\w+)*)", r"took my (\w+(?:\s+\w+)*)", r"taken (\w+(?:\s+\w+)*)" ] for pattern in took_patterns: match = re.search(pattern, text_lower) if match: med_name = match.group(1).title() found = False for med in user_memory["medications"]: if med_name.lower() in med["name"].lower() or med["name"].lower() in med_name.lower(): med["last_taken"] = datetime.now().isoformat() response = f"āœ… Recorded that you took {med['name']} at {datetime.now().strftime('%H:%M')}" found = True break if not found: response = f"āŒ I don't see {med_name} in your medication list. Would you like to add it?" break # Remove medications remove_patterns = [ r"stop taking (\w+(?:\s+\w+)*)", r"stopped (\w+(?:\s+\w+)*)", r"no longer take (\w+(?:\s+\w+)*)", r"remove (\w+(?:\s+\w+)*)", r"delete (\w+(?:\s+\w+)*)", r"discontinue (\w+(?:\s+\w+)*)" ] for pattern in remove_patterns: match = re.search(pattern, text_lower) if match: med_name = match.group(1).title() found = False for i, med in enumerate(user_memory["medications"]): if med_name.lower() in med["name"].lower() or med["name"].lower() in med_name.lower(): removed_med = user_memory["medications"].pop(i) response = f"āœ… Removed {removed_med['name']} from your medications" found = True break if not found: response = f"āŒ I don't see {med_name} in your medication list" break # Add allergies allergy_patterns = [ r"i'm allergic to (\w+(?:\s+\w+)*)", r"i am allergic to (\w+(?:\s+\w+)*)", r"allergic to (\w+(?:\s+\w+)*)", r"allergy to (\w+(?:\s+\w+)*)" ] for pattern in allergy_patterns: match = re.search(pattern, text_lower) if match: allergy = match.group(1).title() if allergy not in user_memory["allergies"]: user_memory["allergies"].append(allergy) response = f"āš ļø Added {allergy} to your allergies list. I'll remember this important information!" else: response = f"I already have {allergy} in your allergies list." break # Add medical conditions condition_patterns = [ r"i have (\w+(?:\s+\w+)*)", r"diagnosed with (\w+(?:\s+\w+)*)", r"i suffer from (\w+(?:\s+\w+)*)" ] for pattern in condition_patterns: match = re.search(pattern, text_lower) if match: condition = match.group(1).title() # Filter out common non-medical phrases skip_words = ["a", "an", "the", "some", "many", "few", "time", "problem", "issue", "question"] if condition.lower() not in skip_words and len(condition) > 2: if condition not in user_memory["conditions"]: user_memory["conditions"].append(condition) response = f"šŸ“‹ Added {condition} to your medical conditions. I'll keep this in mind." break # Show medications if any(phrase in text_lower for phrase in [ "what medications", "which medicines", "my medications", "my medicines", "what pills", "list my medications", "show my meds" ]): if user_memory["medications"]: response = "šŸ’Š **Your current medications:**\n\n" for med in user_memory["medications"]: last_taken = "" if med.get("last_taken"): try: last_time = datetime.fromisoformat(med["last_taken"]) hours_ago = int((datetime.now() - last_time).total_seconds() / 3600) if hours_ago < 1: last_taken = " (taken recently)" elif hours_ago < 24: last_taken = f" (last taken {hours_ago} hours ago)" else: days_ago = hours_ago // 24 last_taken = f" (last taken {days_ago} days ago)" except: pass response += f"• **{med['name']}** - {med['dosage']} {med['frequency']}{last_taken}\n" else: response = "šŸ’Š You don't have any medications recorded yet." # Show allergies if any(phrase in text_lower for phrase in ["my allergies", "what allergies", "allergy list"]): if user_memory["allergies"]: response = "āš ļø **Your allergies:**\n\n" for allergy in user_memory["allergies"]: response += f"• {allergy}\n" else: response = "āš ļø No allergies recorded." # Show conditions if any(phrase in text_lower for phrase in ["my conditions", "medical conditions", "health conditions"]): if user_memory["conditions"]: response = "šŸ“‹ **Your medical conditions:**\n\n" for condition in user_memory["conditions"]: response += f"• {condition}\n" else: response = "šŸ“‹ No medical conditions recorded." # Show everything if any(phrase in text_lower for phrase in ["show everything", "all my info", "my profile", "what do you know about me"]): response = self.get_complete_profile(user_id) # Save memory and add to conversation history if response: user_memory["conversations"].append({ "user": text, "bot": response, "timestamp": datetime.now().isoformat(), "model_used": self.current_model }) # Keep only last 20 conversations if len(user_memory["conversations"]) > 20: user_memory["conversations"] = user_memory["conversations"][-20:] self.save_memory() return response # If no specific pattern matched, use AI return self.generate_ai_response(text, user_id) def get_complete_profile(self, user_id): """Get complete user profile""" if user_id not in self.memory: return "šŸ¤·ā€ā™‚ļø I don't have any information about you yet." user_memory = self.memory[user_id] profile = "šŸ‘¤ **Your Complete Profile:**\n\n" # Basic info if user_memory.get("name"): profile += f"**Name:** {user_memory['name']}\n" if user_memory.get("age"): profile += f"**Age:** {user_memory['age']}\n" # Medications if user_memory.get("medications"): profile += f"\nšŸ’Š **Medications ({len(user_memory['medications'])}):**\n" for med in user_memory["medications"]: profile += f"• {med['name']} - {med['dosage']} {med['frequency']}\n" # Allergies if user_memory.get("allergies"): profile += f"\nāš ļø **Allergies ({len(user_memory['allergies'])}):**\n" for allergy in user_memory["allergies"]: profile += f"• {allergy}\n" # Conditions if user_memory.get("conditions"): profile += f"\nšŸ“‹ **Medical Conditions ({len(user_memory['conditions'])}):**\n" for condition in user_memory["conditions"]: profile += f"• {condition}\n" # Conversation count conv_count = len(user_memory.get("conversations", [])) profile += f"\nšŸ’¬ **Total Conversations:** {conv_count}\n" return profile def generate_ai_response(self, text, user_id): """Generate AI response with memory context using current model""" current_model_info = self.models.get(self.current_model) if not current_model_info or not current_model_info["loaded"]: return "āŒ No model is currently loaded. Please load a model first." model = current_model_info["model"] tokenizer = current_model_info["tokenizer"] # Get user context user_memory = self.memory.get(user_id, {}) context = "You are a helpful medical assistant. Here's what you know about the user:\n\n" if user_memory.get("name"): context += f"Name: {user_memory['name']}\n" if user_memory.get("age"): context += f"Age: {user_memory['age']}\n" if user_memory.get("medications"): context += "Current Medications:\n" for med in user_memory["medications"]: context += f"- {med['name']} {med['dosage']} {med['frequency']}\n" if user_memory.get("allergies"): context += "Allergies: " + ", ".join(user_memory["allergies"]) + "\n" if user_memory.get("conditions"): context += "Medical Conditions: " + ", ".join(user_memory["conditions"]) + "\n" # Recent conversations for context if user_memory.get("conversations"): context += "\nRecent conversation:\n" for conv in user_memory["conversations"][-2:]: # Last 2 context += f"User: {conv['user']}\nAssistant: {conv['bot']}\n" context += f"\nUser: {text}\nAssistant:" try: # Prepare inputs inputs = tokenizer( context, return_tensors="pt", truncation=True, max_length=1024, padding=True ) # Move to device inputs = {k: v.to(self.device) for k, v in inputs.items()} # Generate response with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=200, temperature=0.7, do_sample=True, pad_token_id=tokenizer.eos_token_id, eos_token_id=tokenizer.eos_token_id, repetition_penalty=1.1 ) # Decode response response = tokenizer.decode(outputs[0], skip_special_tokens=True) response = response[len(context):].strip() # Clean up response if not response: response = "I understand. How can I help you with your health today?" # Add to conversation history if user_id not in self.memory: self.memory[user_id] = {"name": None, "age": None, "medications": [], "allergies": [], "conditions": [], "conversations": []} self.memory[user_id]["conversations"].append({ "user": text, "bot": response, "timestamp": datetime.now().isoformat(), "model_used": self.current_model }) if len(self.memory[user_id]["conversations"]) > 20: self.memory[user_id]["conversations"] = self.memory[user_id]["conversations"][-20:] self.save_memory() return response except Exception as e: return f"āŒ Error generating response: {str(e)}" def get_memory_summary(self, user_id): """Get a summary of what the bot remembers""" if user_id not in self.memory: return "🧠 **Memory:** No information stored yet." user_memory = self.memory[user_id] summary = "🧠 **What I Remember:**\n\n" if user_memory.get("name"): summary += f"šŸ‘¤ **Name:** {user_memory['name']}\n" if user_memory.get("age"): summary += f"šŸŽ‚ **Age:** {user_memory['age']}\n" if user_memory.get("medications"): summary += f"\nšŸ’Š **Medications:** {len(user_memory['medications'])} recorded\n" for med in user_memory["medications"]: summary += f" • {med['name']} - {med['dosage']} {med['frequency']}\n" if user_memory.get("allergies"): summary += f"\nāš ļø **Allergies:** {', '.join(user_memory['allergies'])}\n" if user_memory.get("conditions"): summary += f"\nšŸ“‹ **Conditions:** {', '.join(user_memory['conditions'])}\n" conv_count = len(user_memory.get("conversations", [])) summary += f"\nšŸ’¬ **Conversations:** {conv_count} remembered\n" return summary # Initialize bot print("šŸš€ Starting Dual Model Medical Assistant...") bot = DualModelMemoryBot() def chat_function(message, history, user_id): """Main chat function""" if not message.strip(): return history, "" if not user_id.strip(): user_id = "default_user" # Process message response = bot.process_natural_language(message, user_id) # Add to chat history history.append([message, response]) return history, "" def get_memory_info(user_id): """Get memory information""" if not user_id.strip(): return "āš ļø Please enter a User ID first." return bot.get_memory_summary(user_id) def load_model_function(model_name): """Load selected model""" return bot.load_model(model_name) def switch_model_function(model_name): """Switch to selected model""" return bot.switch_model(model_name) def get_model_status(): """Get model status""" return bot.get_model_status() def create_interface(): """Create enhanced interface with dual model support""" with gr.Blocks(css=custom_css, title="šŸ„ Dual Model Medical Assistant", theme=gr.themes.Soft()) as interface: # Header gr.HTML("""

šŸ„ Dual Model Medical Assistant

Advanced AI with memory - Choose your model and talk naturally!

""") with gr.Row(): # Main chat area with gr.Column(scale=2, elem_classes="chat-container"): chatbot = gr.Chatbot( height=500, label="šŸ’¬ Chat with AI Assistant", avatar_images=["šŸ‘¤", "šŸ¤–"], show_copy_button=True ) with gr.Row(): msg = gr.Textbox( placeholder="Try: 'My name is Alex', 'I take aspirin 100mg daily', 'I'm allergic to penicillin'", show_label=False, lines=2, scale=4 ) send_btn = gr.Button("Send šŸ“¤", scale=1, elem_classes="custom-button") with gr.Row(): clear_btn = gr.Button("šŸ—‘ļø Clear Chat", elem_classes="custom-button") profile_btn = gr.Button("šŸ‘¤ Show Full Profile", elem_classes="custom-button") # Sidebar with gr.Column(scale=1): # User settings with gr.Group(elem_classes="model-selector"): gr.HTML("

šŸ‘¤ User Settings

") user_id = gr.Textbox( label="User ID", value="user1", placeholder="Your unique ID" ) memory_btn = gr.Button("🧠 Show Memory", elem_classes="custom-button") # Model selection with gr.Group(elem_classes="model-selector"): gr.HTML("

šŸ¤– Model Selection

") model_dropdown = gr.Dropdown( choices=list(bot.models.keys()), value=bot.current_model, label="Choose Model", interactive=True ) load_btn = gr.Button("šŸ”„ Load Model", elem_classes="custom-button") switch_btn = gr.Button("šŸ”„ Switch Model", elem_classes="custom-button") status_btn = gr.Button("šŸ“Š Model Status", elem_classes="custom-button") # Memory display memory_display = gr.Markdown( "Click 'Show Memory' to see what I remember about you", elem_classes="memory-box" ) # Examples section gr.HTML("

šŸ’” Try these examples:

") with gr.Row(): example_buttons = [ ("šŸ‘‹ My name is Sarah", "My name is Sarah"), ("šŸ’Š I take metformin 500mg twice daily", "I take metformin 500mg twice daily"), ("āš ļø I'm allergic to penicillin", "I'm allergic to penicillin"), ("šŸ“‹ I have diabetes", "I have diabetes"), ("ā“ What medications am I taking?", "What medications am I taking?") ] for display_text, actual_text in example_buttons: btn = gr.Button(display_text, elem_classes="custom-button", scale=1) btn.click( lambda text=actual_text: ([], text), outputs=[chatbot, msg] ) # Event handlers send_btn.click(chat_function, [msg, chatbot, user_id], [chatbot, msg]) msg.submit(chat_function, [msg, chatbot, user_id], [chatbot, msg]) clear_btn.click(lambda: ([], ""), outputs=[chatbot, msg]) memory_btn.click(get_memory_info, inputs=[user_id], outputs=[memory_display]) profile_btn.click( lambda uid: bot.get_complete_profile(uid) if uid.strip() else "āš ļø Please enter a User ID first.", inputs=[user_id], outputs=[memory_display] ) load_btn.click(load_model_function, inputs=[model_dropdown], outputs=[memory_display]) switch_btn.click(switch_model_function, inputs=[model_dropdown], outputs=[memory_display]) status_btn.click(get_model_status, outputs=[memory_display]) return interface if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=True, # Enable sharing for HuggingFace deployment show_error=True )