Qwen3 / utils /conversation.py
Semnykcz's picture
Upload 11 files
a2d424a verified
"""
Conversation management utilities
"""
import json
import logging
import redis
from typing import List, Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ConversationManager:
"""Manage conversation history and caching"""
def __init__(self):
self.redis_client = None
try:
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.redis_client.ping()
except:
logger.warning("Redis not available, using in-memory storage")
self.conversations = {}
def save_conversation(self, conv_id: str, messages: List[Dict[str, Any]]) -> None:
"""Save conversation to cache"""
try:
if self.redis_client:
self.redis_client.setex(conv_id, 86400, json.dumps(messages)) # 24 hours expiry
else:
self.conversations[conv_id] = messages
except Exception as e:
logger.error(f"Error saving conversation: {e}")
def load_conversation(self, conv_id: str) -> List[Dict[str, Any]]:
"""Load conversation from cache"""
try:
if self.redis_client:
data = self.redis_client.get(conv_id)
if data:
return json.loads(data)
else:
return self.conversations.get(conv_id, [])
except Exception as e:
logger.error(f"Error loading conversation: {e}")
return []
def format_messages_for_model(self, messages: List[Dict[str, Any]]) -> str:
"""Format messages for model input"""
prompt = ""
for msg in messages:
if msg["role"] == "system":
prompt += f"System: {msg['content']}\n"
elif msg["role"] == "user":
prompt += f"User: {msg['content']}\n"
elif msg["role"] == "assistant":
prompt += f"Assistant: {msg['content']}\n"
return prompt
def add_message_to_conversation(self, conv_id: str, role: str, content: str) -> None:
"""Add a message to a conversation"""
conversation = self.load_conversation(conv_id)
conversation.append({"role": role, "content": content})
self.save_conversation(conv_id, conversation)