Spaces:
Paused
Paused
| import asyncio | |
| import os | |
| import time | |
| import signal | |
| import sys | |
| from datetime import datetime | |
| import traceback | |
| import logging | |
| import requests | |
| import json | |
| from typing import Optional, Dict, Any | |
| from sqlalchemy.future import select | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy.exc import SQLAlchemyError | |
| from app.database import AsyncSessionLocal, init_db, close_db | |
| from app.models import VideoUpload | |
| from app.utils import pdf, s3 | |
| # Setup logging with UTF-8 encoding for Windows compatibility | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='[%(asctime)s] %(levelname)s - %(name)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler('ollama_worker.log', encoding='utf-8') | |
| ] | |
| ) | |
| logger = logging.getLogger("worker.ollama_daemon") | |
| # Configuration | |
| POLL_INTERVAL = int(os.getenv("OLLAMA_POLL_INTERVAL_SECONDS", "120")) # 2 minutes default | |
| MAX_VIDEOS_PER_CYCLE = int(os.getenv("OLLAMA_MAX_VIDEOS_PER_CYCLE", "1")) | |
| OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") | |
| OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.2:latest") | |
| OLLAMA_WHISPER_MODEL = os.getenv("OLLAMA_WHISPER_MODEL", "whisper:latest") | |
| SHUTDOWN_EVENT = asyncio.Event() | |
| # Global backoff state | |
| _recent_error = False | |
| _error_count = 0 | |
| MAX_ERRORS_BEFORE_BACKOFF = 3 | |
| BACKOFF_SECONDS = int(os.getenv("OLLAMA_BACKOFF_SECONDS", "300")) # 5 minutes | |
| def signal_handler(signum, frame): | |
| """Handle shutdown signals gracefully""" | |
| logger.info(f"Received signal {signum}, initiating graceful shutdown...") | |
| SHUTDOWN_EVENT.set() | |
| async def check_ollama_health() -> bool: | |
| """Check if Ollama service is running and healthy""" | |
| try: | |
| response = requests.get(f"{OLLAMA_BASE_URL}/api/tags", timeout=10) | |
| if response.status_code == 200: | |
| models = response.json().get("models", []) | |
| logger.info(f"Ollama is healthy. Available models: {[m['name'] for m in models]}") | |
| return True | |
| else: | |
| logger.warning(f"Ollama health check failed: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| logger.warning(f"Ollama health check failed: {e}") | |
| return False | |
| async def transcribe_with_ollama(video_url: str) -> str: | |
| """Transcribe video using existing whisper setup as fallback""" | |
| try: | |
| logger.info(f"Starting transcription for video: {video_url}") | |
| # For now, use the existing whisper setup as fallback | |
| # since Ollama whisper model might not be available | |
| from app.utils.whisper_llm import analyze as basic_analyze | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| # Create a mock session for the analyze function | |
| async with AsyncSessionLocal() as session: | |
| # Use the existing whisper analysis but only get transcription | |
| transcription, _ = await basic_analyze(video_url, 0, session) | |
| logger.info(f"Transcription completed. Length: {len(transcription)} characters") | |
| return transcription | |
| except Exception as e: | |
| logger.error(f"Transcription error: {e}") | |
| return f"Transcription failed: {str(e)}" | |
| async def summarize_with_ollama(text: str) -> str: | |
| """Summarize text using Ollama's LLM model""" | |
| try: | |
| logger.info(f"Starting Ollama summarization. Text length: {len(text)}") | |
| # Truncate very long text to avoid token limits | |
| max_chars = 8000 # Adjust based on your model's context length | |
| if len(text) > max_chars: | |
| text = text[:max_chars] + "..." | |
| logger.info(f"Text truncated to {max_chars} characters for summarization") | |
| prompt = f"""Please provide a comprehensive summary of the following text. | |
| Focus on key points, main ideas, and important details. | |
| Make it clear and well-structured. | |
| Text to summarize: | |
| {text} | |
| Summary:""" | |
| payload = { | |
| "model": OLLAMA_MODEL, | |
| "prompt": prompt, | |
| "stream": False, | |
| "options": { | |
| "temperature": 0.3, | |
| "top_p": 0.9, | |
| "max_tokens": 1000 | |
| } | |
| } | |
| response = requests.post( | |
| f"{OLLAMA_BASE_URL}/api/generate", | |
| json=payload, | |
| timeout=120 # 2 minutes timeout | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| summary = result.get('response', '').strip() | |
| logger.info(f"Ollama summarization completed. Summary length: {len(summary)}") | |
| return summary | |
| else: | |
| logger.error(f"Ollama summarization failed: {response.status_code} - {response.text}") | |
| return f"Summarization failed - Ollama service error" | |
| except Exception as e: | |
| logger.error(f"Ollama summarization error: {e}") | |
| return f"Summarization failed: {str(e)}" | |
| async def enhanced_analysis_with_ollama(transcription: str, summary: str) -> Dict[str, Any]: | |
| """Perform enhanced analysis using Ollama's LLM""" | |
| try: | |
| logger.info("Starting Ollama enhanced analysis") | |
| prompt = f"""Analyze this video content and provide detailed insights: | |
| TRANSCRIPTION: | |
| {transcription} | |
| SUMMARY: | |
| {summary} | |
| Please provide: | |
| 1. Key topics and themes (as a list) | |
| 2. Sentiment analysis (positive/negative/neutral percentages) | |
| 3. Important insights and takeaways | |
| 4. Recommendations for the user | |
| 5. Context and implications | |
| Format your response as a JSON object with these keys: | |
| - topics: array of strings | |
| - sentiment: object with positive, negative, neutral percentages | |
| - insights: string | |
| - recommendations: string | |
| - context: string | |
| Response:""" | |
| payload = { | |
| "model": OLLAMA_MODEL, | |
| "prompt": prompt, | |
| "stream": False, | |
| "options": { | |
| "temperature": 0.2, | |
| "top_p": 0.8, | |
| "max_tokens": 1500 | |
| } | |
| } | |
| response = requests.post( | |
| f"{OLLAMA_BASE_URL}/api/generate", | |
| json=payload, | |
| timeout=180 # 3 minutes timeout | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| analysis_text = result.get('response', '').strip() | |
| # Try to parse JSON response | |
| try: | |
| # Extract JSON from response (in case there's extra text) | |
| start_idx = analysis_text.find('{') | |
| end_idx = analysis_text.rfind('}') + 1 | |
| if start_idx != -1 and end_idx > start_idx: | |
| json_str = analysis_text[start_idx:end_idx] | |
| analysis = json.loads(json_str) | |
| else: | |
| # Fallback if no JSON found | |
| analysis = { | |
| "topics": ["general"], | |
| "sentiment": {"positive": 0.5, "negative": 0.2, "neutral": 0.3}, | |
| "insights": analysis_text[:500], | |
| "recommendations": "Review the content for key insights", | |
| "context": "Analysis completed using Ollama" | |
| } | |
| logger.info("Ollama enhanced analysis completed successfully") | |
| return analysis | |
| except json.JSONDecodeError: | |
| logger.warning("Failed to parse JSON from Ollama response, using fallback") | |
| return { | |
| "topics": ["general"], | |
| "sentiment": {"positive": 0.5, "negative": 0.2, "neutral": 0.3}, | |
| "insights": analysis_text[:500], | |
| "recommendations": "Review the content for key insights", | |
| "context": "Analysis completed using Ollama (fallback format)" | |
| } | |
| else: | |
| logger.error(f"Ollama enhanced analysis failed: {response.status_code} - {response.text}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Ollama enhanced analysis error: {e}") | |
| return None | |
| async def process_pending_videos(): | |
| """Process all pending video uploads using Ollama""" | |
| global _recent_error, _error_count | |
| async with AsyncSessionLocal() as session: | |
| try: | |
| # Check Ollama health first | |
| if not await check_ollama_health(): | |
| logger.warning("Ollama service is not available, skipping this cycle") | |
| _recent_error = True | |
| _error_count += 1 | |
| return | |
| # Reset error count on successful health check | |
| _error_count = 0 | |
| _recent_error = False | |
| # Query for pending videos | |
| result = await session.execute( | |
| select(VideoUpload).where(VideoUpload.status == "pending") | |
| ) | |
| all_pending = result.scalars().all() | |
| pending_videos = all_pending[:MAX_VIDEOS_PER_CYCLE] if all_pending else [] | |
| if not pending_videos: | |
| logger.info("No pending videos found") | |
| return | |
| logger.info(f"Found {len(pending_videos)} pending videos to process with Ollama") | |
| for video in pending_videos: | |
| if SHUTDOWN_EVENT.is_set(): | |
| logger.info("Shutdown requested, stopping video processing") | |
| break | |
| logger.info(f"Processing video ID {video.id} for user {video.user_id} with Ollama") | |
| try: | |
| # Update status to processing | |
| video.status = "processing" | |
| video.updated_at = datetime.utcnow() | |
| await session.commit() | |
| # Step 1: Transcribe with Ollama | |
| transcription = await transcribe_with_ollama(video.video_url) | |
| if not transcription or "failed" in transcription.lower(): | |
| raise Exception(f"Transcription failed: {transcription}") | |
| # Step 2: Summarize with Ollama | |
| summary = await summarize_with_ollama(transcription) | |
| if not summary or "failed" in summary.lower(): | |
| logger.warning("Summarization failed, using transcription as summary") | |
| summary = transcription[:1000] + "..." if len(transcription) > 1000 else transcription | |
| # Step 3: Enhanced analysis with Ollama | |
| enhanced_analysis = await enhanced_analysis_with_ollama(transcription, summary) | |
| # Step 4: Generate comprehensive report | |
| if enhanced_analysis: | |
| report = f"""# 📹 Video Analysis Report (Ollama Enhanced) | |
| ## 🎵 Audio Transcription | |
| {transcription} | |
| ## 📝 Summary | |
| {summary} | |
| ## 🤖 Enhanced Analysis (Ollama {OLLAMA_MODEL}) | |
| **Topics**: {', '.join(enhanced_analysis.get('topics', ['General']))} | |
| **Sentiment**: {enhanced_analysis.get('sentiment', {})} | |
| **Insights**: {enhanced_analysis.get('insights', 'No additional insights')} | |
| **Recommendations**: {enhanced_analysis.get('recommendations', 'No specific recommendations')} | |
| **Context**: {enhanced_analysis.get('context', 'Analysis completed')} | |
| --- | |
| *Report generated using Ollama {OLLAMA_MODEL} running locally* | |
| """ | |
| else: | |
| report = f"""# 📹 Video Analysis Report (Ollama Basic) | |
| ## 🎵 Audio Transcription | |
| {transcription} | |
| ## 📝 Summary | |
| {summary} | |
| ## 📊 Analysis Details | |
| - **Processing Method**: Ollama Local Processing | |
| - **Model**: {OLLAMA_MODEL} | |
| - **Enhanced Features**: Basic analysis only | |
| --- | |
| *Report generated using Ollama {OLLAMA_MODEL} running locally* | |
| """ | |
| logger.info(f"Ollama analysis completed for video {video.id}") | |
| except Exception as e: | |
| logger.error(f"Ollama processing failed for video {video.id}: {e}") | |
| logger.debug(traceback.format_exc()) | |
| # Update status to failed | |
| video.status = "failed" | |
| video.updated_at = datetime.utcnow() | |
| await session.commit() | |
| _error_count += 1 | |
| continue | |
| try: | |
| # Generate PDF | |
| pdf_bytes = pdf.generate(transcription, summary) | |
| logger.info(f"PDF generation completed for video {video.id}") | |
| except Exception as e: | |
| logger.error(f"PDF generation failed for video {video.id}: {e}") | |
| logger.debug(traceback.format_exc()) | |
| video.status = "failed" | |
| video.updated_at = datetime.utcnow() | |
| await session.commit() | |
| _error_count += 1 | |
| continue | |
| try: | |
| # Upload to S3 | |
| pdf_key = f"pdfs/ollama_{video.id}.pdf" | |
| pdf_url = s3.upload_pdf_bytes(pdf_bytes, pdf_key) | |
| logger.info(f"S3 upload completed for video {video.id}") | |
| except Exception as e: | |
| logger.error(f"Upload to S3 failed for video {video.id}: {e}") | |
| logger.debug(traceback.format_exc()) | |
| video.status = "failed" | |
| video.updated_at = datetime.utcnow() | |
| await session.commit() | |
| _error_count += 1 | |
| continue | |
| try: | |
| # Mark as completed | |
| video.status = "completed" | |
| video.pdf_url = pdf_url | |
| video.updated_at = datetime.utcnow() | |
| await session.commit() | |
| logger.info(f"Successfully completed video {video.id} with Ollama") | |
| except SQLAlchemyError as e: | |
| logger.error(f"DB commit failed for video {video.id}: {e}") | |
| logger.debug(traceback.format_exc()) | |
| await session.rollback() | |
| _error_count += 1 | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error: {e}") | |
| logger.debug(traceback.format_exc()) | |
| _error_count += 1 | |
| except Exception as e: | |
| logger.error(f"Unexpected error in process_pending_videos: {e}") | |
| logger.debug(traceback.format_exc()) | |
| _error_count += 1 | |
| async def run_worker(): | |
| """Main worker loop""" | |
| logger.info("Ollama worker daemon started...") | |
| # Initialize database | |
| try: | |
| await init_db() | |
| logger.info("Database initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize database: {e}") | |
| return | |
| cycle_count = 0 | |
| while not SHUTDOWN_EVENT.is_set(): | |
| cycle_count += 1 | |
| logger.info(f"Ollama worker cycle {cycle_count} - Checking for pending videos...") | |
| try: | |
| await process_pending_videos() | |
| except Exception as e: | |
| logger.error(f"Worker loop error: {e}") | |
| logger.debug(traceback.format_exc()) | |
| # Check if we need to back off due to errors | |
| global _recent_error, _error_count | |
| if _error_count >= MAX_ERRORS_BEFORE_BACKOFF: | |
| logger.warning(f"Too many errors ({_error_count}), backing off for {BACKOFF_SECONDS} seconds...") | |
| try: | |
| await asyncio.wait_for(SHUTDOWN_EVENT.wait(), timeout=BACKOFF_SECONDS) | |
| except asyncio.TimeoutError: | |
| pass | |
| _error_count = 0 # Reset error count after backoff | |
| # Wait for next cycle or shutdown | |
| try: | |
| await asyncio.wait_for(SHUTDOWN_EVENT.wait(), timeout=POLL_INTERVAL) | |
| except asyncio.TimeoutError: | |
| # Normal timeout, continue to next cycle | |
| pass | |
| except Exception as e: | |
| logger.error(f"Error in worker wait: {e}") | |
| break | |
| logger.info("Ollama worker loop stopped, cleaning up...") | |
| # Cleanup | |
| try: | |
| await close_db() | |
| logger.info("Database connections closed") | |
| except Exception as e: | |
| logger.error(f"Error during cleanup: {e}") | |
| async def main(): | |
| """Main entry point with signal handling""" | |
| # Setup signal handlers | |
| signal.signal(signal.SIGINT, signal_handler) | |
| signal.signal(signal.SIGTERM, signal_handler) | |
| try: | |
| await run_worker() | |
| except KeyboardInterrupt: | |
| logger.info("Keyboard interrupt received") | |
| except Exception as e: | |
| logger.error(f"Fatal error in main: {e}") | |
| logger.debug(traceback.format_exc()) | |
| finally: | |
| logger.info("Ollama worker daemon shutdown complete") | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| logger.info("Ollama worker daemon interrupted by user") | |
| except Exception as e: | |
| logger.error(f"Fatal error: {e}") | |
| sys.exit(1) | |