import asyncio import json import websockets from typing import Dict, Any, Set import subprocess import shlex from queue import Queue import threading import time import logging from datetime import datetime logger = logging.getLogger(__name__) class TerminalStreamManager: """Manages real-time terminal streaming with WebSocket connections.""" def __init__(self): self.clients: Set[websockets.WebSocketServerProtocol] = set() self.command_queue = Queue() self.is_running = False self.current_process = None self.server = None self.server_thread = None self.loop = None async def register_client(self, websocket): """Register a new WebSocket client.""" self.clients.add(websocket) await websocket.send(json.dumps({ 'type': 'connected', 'message': '🚀 Terminal connected successfully', 'timestamp': datetime.now().isoformat() })) logger.info(f"Terminal client connected. Total clients: {len(self.clients)}") async def unregister_client(self, websocket): """Unregister a WebSocket client.""" self.clients.discard(websocket) logger.info(f"Terminal client disconnected. Total clients: {len(self.clients)}") async def broadcast(self, message: Dict[str, Any]): """Broadcast message to all connected clients.""" if self.clients: disconnected = set() message['timestamp'] = datetime.now().isoformat() for client in self.clients: try: await client.send(json.dumps(message)) except websockets.exceptions.ConnectionClosed: disconnected.add(client) except Exception as e: logger.error(f"Error broadcasting to client: {e}") disconnected.add(client) # Clean up disconnected clients for client in disconnected: self.clients.discard(client) async def execute_command(self, command: str): """Execute a command and stream output in real-time.""" await self.broadcast({ 'type': 'command_start', 'command': command, 'message': f'$ {command}' }) try: # Security: Use shell=False and sanitize input safe_command = shlex.split(command) self.current_process = subprocess.Popen( safe_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) # Stream output in real-time while True: # Check if process is still running if self.current_process.poll() is not None: # Process finished, read remaining output remaining_stdout = self.current_process.stdout.read() remaining_stderr = self.current_process.stderr.read() if remaining_stdout: await self.broadcast({ 'type': 'output', 'data': remaining_stdout, 'stream': 'stdout' }) if remaining_stderr: await self.broadcast({ 'type': 'output', 'data': remaining_stderr, 'stream': 'stderr' }) break # Read available output try: # Use select or polling for non-blocking read import select ready, _, _ = select.select([self.current_process.stdout, self.current_process.stderr], [], [], 0.1) for stream in ready: if stream == self.current_process.stdout: line = stream.readline() if line: await self.broadcast({ 'type': 'output', 'data': line, 'stream': 'stdout' }) elif stream == self.current_process.stderr: line = stream.readline() if line: await self.broadcast({ 'type': 'output', 'data': line, 'stream': 'stderr' }) except: # Fallback for systems without select await asyncio.sleep(0.1) # Send completion message await self.broadcast({ 'type': 'command_complete', 'exit_code': self.current_process.returncode, 'message': f'Process exited with code {self.current_process.returncode}' }) except Exception as e: await self.broadcast({ 'type': 'error', 'data': str(e), 'stream': 'system' }) finally: self.current_process = None async def handle_client(self, websocket, path): """Handle WebSocket client connections.""" await self.register_client(websocket) try: async for message in websocket: try: data = json.loads(message) if data.get('type') == 'command': command = data.get('command', '').strip() if command: await self.execute_command(command) elif data.get('type') == 'interrupt': if self.current_process: self.current_process.terminate() await self.broadcast({ 'type': 'interrupted', 'message': 'Process interrupted by user' }) except json.JSONDecodeError: await websocket.send(json.dumps({ 'type': 'error', 'message': 'Invalid JSON message' })) except websockets.exceptions.ConnectionClosed: pass finally: await self.unregister_client(websocket) def stop_server(self): """Stop the WebSocket server gracefully.""" if self.server: logger.info("Stopping terminal WebSocket server...") self.is_running = False # Close all client connections if self.clients: import asyncio try: loop = asyncio.get_event_loop() for client in self.clients.copy(): try: loop.create_task(client.close()) except Exception as e: logger.warning(f"Error closing client connection: {e}") self.clients.clear() except Exception as e: logger.warning(f"Error closing client connections: {e}") # Terminate current process if running if self.current_process: try: self.current_process.terminate() self.current_process = None except Exception as e: logger.warning(f"Error terminating process: {e}") # Close the server try: if hasattr(self.server, 'close'): self.server.close() # Stop the event loop if it exists if self.loop and self.loop.is_running(): self.loop.call_soon_threadsafe(self.loop.stop) logger.info("Terminal WebSocket server stopped") except Exception as e: logger.error(f"Error stopping WebSocket server: {e}") else: logger.info("Terminal WebSocket server was not running") # Global terminal manager instance terminal_manager = TerminalStreamManager() async def start_websocket_server(host='localhost', port=8765): """Start the WebSocket server for terminal streaming.""" logger.info(f"Starting terminal WebSocket server on {host}:{port}") async def handler(websocket, path): await terminal_manager.handle_client(websocket, path) server = await websockets.serve(handler, host, port) terminal_manager.server = server terminal_manager.is_running = True return server def run_websocket_server(): """Run WebSocket server in a separate thread.""" def start_server(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) terminal_manager.loop = loop try: server = loop.run_until_complete(start_websocket_server()) logger.info("Terminal WebSocket server started successfully") loop.run_forever() except Exception as e: logger.error(f"Error starting WebSocket server: {e}") finally: logger.info("WebSocket server loop ended") thread = threading.Thread(target=start_server, daemon=True) terminal_manager.server_thread = thread thread.start() return thread