#!/usr/bin/env python3 """ Test script for the agentic video analysis system with Groq integration """ import asyncio import os import sys from pathlib import Path # Add project root to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) async def test_groq_integration(): """Test Groq integration and basic functionality""" print("๐Ÿงช Testing Groq Integration for Agentic Video Analysis") print("=" * 60) # Check for Groq API key groq_api_key = os.getenv("GROQ_API_KEY") if not groq_api_key: print("โŒ GROQ_API_KEY environment variable not found!") print("Please set your Groq API key:") print("1. Get API key from: https://console.groq.com/") print("2. Set environment variable: GROQ_API_KEY=your_key_here") return False print("โœ… GROQ_API_KEY found") try: # Test Groq import from langchain_groq import ChatGroq print("โœ… langchain-groq imported successfully") # Test Groq connection llm = ChatGroq( groq_api_key=groq_api_key, model_name="llama3-8b-8192", temperature=0.1, max_tokens=100 ) # Simple test response = await llm.ainvoke("Say 'Hello from Groq!'") print(f"โœ… Groq test successful: {response.content}") except ImportError as e: print(f"โŒ Failed to import langchain-groq: {e}") print("Please install: pip install langchain-groq") return False except Exception as e: print(f"โŒ Groq test failed: {e}") return False return True async def test_enhanced_analysis(): """Test enhanced analysis components""" print("\n๐Ÿ” Testing Enhanced Analysis Components") print("=" * 60) try: # Test imports from app.utils.enhanced_analysis import MultiModalAnalyzer print("โœ… Enhanced analysis imports successful") # Test analyzer initialization groq_api_key = os.getenv("GROQ_API_KEY") analyzer = MultiModalAnalyzer(groq_api_key=groq_api_key) print("โœ… MultiModalAnalyzer initialized successfully") # Test agent creation if analyzer.agent: print("โœ… Agent created successfully") else: print("โŒ Agent creation failed") return False except Exception as e: print(f"โŒ Enhanced analysis test failed: {e}") return False return True async def test_agentic_integration(): """Test agentic integration""" print("\n๐Ÿค– Testing Agentic Integration") print("=" * 60) try: from app.utils.agentic_integration import AgenticVideoProcessor, MCPToolManager print("โœ… Agentic integration imports successful") # Test processor initialization groq_api_key = os.getenv("GROQ_API_KEY") processor = AgenticVideoProcessor(enable_enhanced_analysis=True, groq_api_key=groq_api_key) print("โœ… AgenticVideoProcessor initialized successfully") # Test MCP tool manager tool_manager = MCPToolManager(groq_api_key=groq_api_key) print("โœ… MCPToolManager initialized successfully") # Test tool registration if tool_manager.tools: print(f"โœ… {len(tool_manager.tools)} tools registered") else: print("โŒ No tools registered") return False except Exception as e: print(f"โŒ Agentic integration test failed: {e}") return False return True async def test_dependencies(): """Test all required dependencies""" print("\n๐Ÿ“ฆ Testing Dependencies") print("=" * 60) dependencies = [ ("opencv-python", "cv2"), ("pillow", "PIL"), ("torch", "torch"), ("transformers", "transformers"), ("faster_whisper", "faster_whisper"), ("langchain", "langchain"), ("langchain_groq", "langchain_groq"), ("duckduckgo-search", "duckduckgo_search"), ("wikipedia-api", "wikipedia"), ] all_good = True for package_name, import_name in dependencies: try: __import__(import_name) print(f"โœ… {package_name}") except ImportError: print(f"โŒ {package_name} - missing") all_good = False return all_good async def main(): """Main test function""" print("๐Ÿš€ Dubsway Video AI - Agentic System Test") print("=" * 60) # Test dependencies first deps_ok = await test_dependencies() if not deps_ok: print("\nโŒ Some dependencies are missing. Please install them:") print("pip install -r requirements.txt") return False # Test Groq integration groq_ok = await test_groq_integration() if not groq_ok: return False # Test enhanced analysis enhanced_ok = await test_enhanced_analysis() if not enhanced_ok: return False # Test agentic integration agentic_ok = await test_agentic_integration() if not agentic_ok: return False print("\n๐ŸŽ‰ All tests passed! Your agentic system is ready to use.") print("\n๐Ÿ“‹ Next steps:") print("1. Update your worker/daemon.py to use agentic analysis") print("2. Set GROQ_API_KEY environment variable") print("3. Run your daemon with enhanced capabilities") return True if __name__ == "__main__": success = asyncio.run(main()) sys.exit(0 if success else 1)