import asyncio import os import time import signal import sys from datetime import datetime import traceback import logging import requests import json from typing import Optional, Dict, Any from sqlalchemy.future import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import SQLAlchemyError from app.database import AsyncSessionLocal, init_db, close_db from app.models import VideoUpload from app.utils import pdf, s3 # Setup logging with UTF-8 encoding for Windows compatibility logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(name)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('ollama_worker.log', encoding='utf-8') ] ) logger = logging.getLogger("worker.ollama_daemon") # Configuration POLL_INTERVAL = int(os.getenv("OLLAMA_POLL_INTERVAL_SECONDS", "120")) # 2 minutes default MAX_VIDEOS_PER_CYCLE = int(os.getenv("OLLAMA_MAX_VIDEOS_PER_CYCLE", "1")) OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.2:latest") OLLAMA_WHISPER_MODEL = os.getenv("OLLAMA_WHISPER_MODEL", "whisper:latest") SHUTDOWN_EVENT = asyncio.Event() # Global backoff state _recent_error = False _error_count = 0 MAX_ERRORS_BEFORE_BACKOFF = 3 BACKOFF_SECONDS = int(os.getenv("OLLAMA_BACKOFF_SECONDS", "300")) # 5 minutes def signal_handler(signum, frame): """Handle shutdown signals gracefully""" logger.info(f"Received signal {signum}, initiating graceful shutdown...") SHUTDOWN_EVENT.set() async def check_ollama_health() -> bool: """Check if Ollama service is running and healthy""" try: response = requests.get(f"{OLLAMA_BASE_URL}/api/tags", timeout=10) if response.status_code == 200: models = response.json().get("models", []) logger.info(f"Ollama is healthy. Available models: {[m['name'] for m in models]}") return True else: logger.warning(f"Ollama health check failed: {response.status_code}") return False except Exception as e: logger.warning(f"Ollama health check failed: {e}") return False async def transcribe_with_ollama(video_url: str) -> str: """Transcribe video using existing whisper setup as fallback""" try: logger.info(f"Starting transcription for video: {video_url}") # For now, use the existing whisper setup as fallback # since Ollama whisper model might not be available from app.utils.whisper_llm import analyze as basic_analyze from sqlalchemy.ext.asyncio import AsyncSession # Create a mock session for the analyze function async with AsyncSessionLocal() as session: # Use the existing whisper analysis but only get transcription transcription, _ = await basic_analyze(video_url, 0, session) logger.info(f"Transcription completed. Length: {len(transcription)} characters") return transcription except Exception as e: logger.error(f"Transcription error: {e}") return f"Transcription failed: {str(e)}" async def summarize_with_ollama(text: str) -> str: """Summarize text using Ollama's LLM model""" try: logger.info(f"Starting Ollama summarization. Text length: {len(text)}") # Truncate very long text to avoid token limits max_chars = 8000 # Adjust based on your model's context length if len(text) > max_chars: text = text[:max_chars] + "..." logger.info(f"Text truncated to {max_chars} characters for summarization") prompt = f"""Please provide a comprehensive summary of the following text. Focus on key points, main ideas, and important details. Make it clear and well-structured. Text to summarize: {text} Summary:""" payload = { "model": OLLAMA_MODEL, "prompt": prompt, "stream": False, "options": { "temperature": 0.3, "top_p": 0.9, "max_tokens": 1000 } } response = requests.post( f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=120 # 2 minutes timeout ) if response.status_code == 200: result = response.json() summary = result.get('response', '').strip() logger.info(f"Ollama summarization completed. Summary length: {len(summary)}") return summary else: logger.error(f"Ollama summarization failed: {response.status_code} - {response.text}") return f"Summarization failed - Ollama service error" except Exception as e: logger.error(f"Ollama summarization error: {e}") return f"Summarization failed: {str(e)}" async def enhanced_analysis_with_ollama(transcription: str, summary: str) -> Dict[str, Any]: """Perform enhanced analysis using Ollama's LLM""" try: logger.info("Starting Ollama enhanced analysis") prompt = f"""Analyze this video content and provide detailed insights: TRANSCRIPTION: {transcription} SUMMARY: {summary} Please provide: 1. Key topics and themes (as a list) 2. Sentiment analysis (positive/negative/neutral percentages) 3. Important insights and takeaways 4. Recommendations for the user 5. Context and implications Format your response as a JSON object with these keys: - topics: array of strings - sentiment: object with positive, negative, neutral percentages - insights: string - recommendations: string - context: string Response:""" payload = { "model": OLLAMA_MODEL, "prompt": prompt, "stream": False, "options": { "temperature": 0.2, "top_p": 0.8, "max_tokens": 1500 } } response = requests.post( f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=180 # 3 minutes timeout ) if response.status_code == 200: result = response.json() analysis_text = result.get('response', '').strip() # Try to parse JSON response try: # Extract JSON from response (in case there's extra text) start_idx = analysis_text.find('{') end_idx = analysis_text.rfind('}') + 1 if start_idx != -1 and end_idx > start_idx: json_str = analysis_text[start_idx:end_idx] analysis = json.loads(json_str) else: # Fallback if no JSON found analysis = { "topics": ["general"], "sentiment": {"positive": 0.5, "negative": 0.2, "neutral": 0.3}, "insights": analysis_text[:500], "recommendations": "Review the content for key insights", "context": "Analysis completed using Ollama" } logger.info("Ollama enhanced analysis completed successfully") return analysis except json.JSONDecodeError: logger.warning("Failed to parse JSON from Ollama response, using fallback") return { "topics": ["general"], "sentiment": {"positive": 0.5, "negative": 0.2, "neutral": 0.3}, "insights": analysis_text[:500], "recommendations": "Review the content for key insights", "context": "Analysis completed using Ollama (fallback format)" } else: logger.error(f"Ollama enhanced analysis failed: {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"Ollama enhanced analysis error: {e}") return None async def process_pending_videos(): """Process all pending video uploads using Ollama""" global _recent_error, _error_count async with AsyncSessionLocal() as session: try: # Check Ollama health first if not await check_ollama_health(): logger.warning("Ollama service is not available, skipping this cycle") _recent_error = True _error_count += 1 return # Reset error count on successful health check _error_count = 0 _recent_error = False # Query for pending videos result = await session.execute( select(VideoUpload).where(VideoUpload.status == "pending") ) all_pending = result.scalars().all() pending_videos = all_pending[:MAX_VIDEOS_PER_CYCLE] if all_pending else [] if not pending_videos: logger.info("No pending videos found") return logger.info(f"Found {len(pending_videos)} pending videos to process with Ollama") for video in pending_videos: if SHUTDOWN_EVENT.is_set(): logger.info("Shutdown requested, stopping video processing") break logger.info(f"Processing video ID {video.id} for user {video.user_id} with Ollama") try: # Update status to processing video.status = "processing" video.updated_at = datetime.utcnow() await session.commit() # Step 1: Transcribe with Ollama transcription = await transcribe_with_ollama(video.video_url) if not transcription or "failed" in transcription.lower(): raise Exception(f"Transcription failed: {transcription}") # Step 2: Summarize with Ollama summary = await summarize_with_ollama(transcription) if not summary or "failed" in summary.lower(): logger.warning("Summarization failed, using transcription as summary") summary = transcription[:1000] + "..." if len(transcription) > 1000 else transcription # Step 3: Enhanced analysis with Ollama enhanced_analysis = await enhanced_analysis_with_ollama(transcription, summary) # Step 4: Generate comprehensive report if enhanced_analysis: report = f"""# 📹 Video Analysis Report (Ollama Enhanced) ## 🎵 Audio Transcription {transcription} ## 📝 Summary {summary} ## 🤖 Enhanced Analysis (Ollama {OLLAMA_MODEL}) **Topics**: {', '.join(enhanced_analysis.get('topics', ['General']))} **Sentiment**: {enhanced_analysis.get('sentiment', {})} **Insights**: {enhanced_analysis.get('insights', 'No additional insights')} **Recommendations**: {enhanced_analysis.get('recommendations', 'No specific recommendations')} **Context**: {enhanced_analysis.get('context', 'Analysis completed')} --- *Report generated using Ollama {OLLAMA_MODEL} running locally* """ else: report = f"""# 📹 Video Analysis Report (Ollama Basic) ## 🎵 Audio Transcription {transcription} ## 📝 Summary {summary} ## 📊 Analysis Details - **Processing Method**: Ollama Local Processing - **Model**: {OLLAMA_MODEL} - **Enhanced Features**: Basic analysis only --- *Report generated using Ollama {OLLAMA_MODEL} running locally* """ logger.info(f"Ollama analysis completed for video {video.id}") except Exception as e: logger.error(f"Ollama processing failed for video {video.id}: {e}") logger.debug(traceback.format_exc()) # Update status to failed video.status = "failed" video.updated_at = datetime.utcnow() await session.commit() _error_count += 1 continue try: # Generate PDF pdf_bytes = pdf.generate(transcription, summary) logger.info(f"PDF generation completed for video {video.id}") except Exception as e: logger.error(f"PDF generation failed for video {video.id}: {e}") logger.debug(traceback.format_exc()) video.status = "failed" video.updated_at = datetime.utcnow() await session.commit() _error_count += 1 continue try: # Upload to S3 pdf_key = f"pdfs/ollama_{video.id}.pdf" pdf_url = s3.upload_pdf_bytes(pdf_bytes, pdf_key) logger.info(f"S3 upload completed for video {video.id}") except Exception as e: logger.error(f"Upload to S3 failed for video {video.id}: {e}") logger.debug(traceback.format_exc()) video.status = "failed" video.updated_at = datetime.utcnow() await session.commit() _error_count += 1 continue try: # Mark as completed video.status = "completed" video.pdf_url = pdf_url video.updated_at = datetime.utcnow() await session.commit() logger.info(f"Successfully completed video {video.id} with Ollama") except SQLAlchemyError as e: logger.error(f"DB commit failed for video {video.id}: {e}") logger.debug(traceback.format_exc()) await session.rollback() _error_count += 1 except SQLAlchemyError as e: logger.error(f"Database error: {e}") logger.debug(traceback.format_exc()) _error_count += 1 except Exception as e: logger.error(f"Unexpected error in process_pending_videos: {e}") logger.debug(traceback.format_exc()) _error_count += 1 async def run_worker(): """Main worker loop""" logger.info("Ollama worker daemon started...") # Initialize database try: await init_db() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Failed to initialize database: {e}") return cycle_count = 0 while not SHUTDOWN_EVENT.is_set(): cycle_count += 1 logger.info(f"Ollama worker cycle {cycle_count} - Checking for pending videos...") try: await process_pending_videos() except Exception as e: logger.error(f"Worker loop error: {e}") logger.debug(traceback.format_exc()) # Check if we need to back off due to errors global _recent_error, _error_count if _error_count >= MAX_ERRORS_BEFORE_BACKOFF: logger.warning(f"Too many errors ({_error_count}), backing off for {BACKOFF_SECONDS} seconds...") try: await asyncio.wait_for(SHUTDOWN_EVENT.wait(), timeout=BACKOFF_SECONDS) except asyncio.TimeoutError: pass _error_count = 0 # Reset error count after backoff # Wait for next cycle or shutdown try: await asyncio.wait_for(SHUTDOWN_EVENT.wait(), timeout=POLL_INTERVAL) except asyncio.TimeoutError: # Normal timeout, continue to next cycle pass except Exception as e: logger.error(f"Error in worker wait: {e}") break logger.info("Ollama worker loop stopped, cleaning up...") # Cleanup try: await close_db() logger.info("Database connections closed") except Exception as e: logger.error(f"Error during cleanup: {e}") async def main(): """Main entry point with signal handling""" # Setup signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: await run_worker() except KeyboardInterrupt: logger.info("Keyboard interrupt received") except Exception as e: logger.error(f"Fatal error in main: {e}") logger.debug(traceback.format_exc()) finally: logger.info("Ollama worker daemon shutdown complete") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Ollama worker daemon interrupted by user") except Exception as e: logger.error(f"Fatal error: {e}") sys.exit(1)