|
import asyncio |
|
import json |
|
import websockets |
|
from typing import Dict, Any, Set |
|
import subprocess |
|
import shlex |
|
from queue import Queue |
|
import threading |
|
import time |
|
import logging |
|
from datetime import datetime |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class TerminalStreamManager: |
|
"""Manages real-time terminal streaming with WebSocket connections.""" |
|
|
|
def __init__(self): |
|
self.clients: Set[websockets.WebSocketServerProtocol] = set() |
|
self.command_queue = Queue() |
|
self.is_running = False |
|
self.current_process = None |
|
|
|
async def register_client(self, websocket): |
|
"""Register a new WebSocket client.""" |
|
self.clients.add(websocket) |
|
await websocket.send(json.dumps({ |
|
'type': 'connected', |
|
'message': 'π Terminal connected successfully', |
|
'timestamp': datetime.now().isoformat() |
|
})) |
|
logger.info(f"Terminal client connected. Total clients: {len(self.clients)}") |
|
|
|
async def unregister_client(self, websocket): |
|
"""Unregister a WebSocket client.""" |
|
self.clients.discard(websocket) |
|
logger.info(f"Terminal client disconnected. Total clients: {len(self.clients)}") |
|
|
|
async def broadcast(self, message: Dict[str, Any]): |
|
"""Broadcast message to all connected clients.""" |
|
if self.clients: |
|
disconnected = set() |
|
message['timestamp'] = datetime.now().isoformat() |
|
|
|
for client in self.clients: |
|
try: |
|
await client.send(json.dumps(message)) |
|
except websockets.exceptions.ConnectionClosed: |
|
disconnected.add(client) |
|
except Exception as e: |
|
logger.error(f"Error broadcasting to client: {e}") |
|
disconnected.add(client) |
|
|
|
|
|
for client in disconnected: |
|
self.clients.discard(client) |
|
|
|
async def execute_command(self, command: str): |
|
"""Execute a command and stream output in real-time.""" |
|
await self.broadcast({ |
|
'type': 'command_start', |
|
'command': command, |
|
'message': f'$ {command}' |
|
}) |
|
|
|
try: |
|
|
|
safe_command = shlex.split(command) |
|
|
|
self.current_process = subprocess.Popen( |
|
safe_command, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
bufsize=1, |
|
universal_newlines=True |
|
) |
|
|
|
|
|
while True: |
|
|
|
if self.current_process.poll() is not None: |
|
|
|
remaining_stdout = self.current_process.stdout.read() |
|
remaining_stderr = self.current_process.stderr.read() |
|
|
|
if remaining_stdout: |
|
await self.broadcast({ |
|
'type': 'output', |
|
'data': remaining_stdout, |
|
'stream': 'stdout' |
|
}) |
|
|
|
if remaining_stderr: |
|
await self.broadcast({ |
|
'type': 'output', |
|
'data': remaining_stderr, |
|
'stream': 'stderr' |
|
}) |
|
|
|
break |
|
|
|
|
|
try: |
|
|
|
import select |
|
ready, _, _ = select.select([self.current_process.stdout, self.current_process.stderr], [], [], 0.1) |
|
|
|
for stream in ready: |
|
if stream == self.current_process.stdout: |
|
line = stream.readline() |
|
if line: |
|
await self.broadcast({ |
|
'type': 'output', |
|
'data': line, |
|
'stream': 'stdout' |
|
}) |
|
elif stream == self.current_process.stderr: |
|
line = stream.readline() |
|
if line: |
|
await self.broadcast({ |
|
'type': 'output', |
|
'data': line, |
|
'stream': 'stderr' |
|
}) |
|
except: |
|
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
await self.broadcast({ |
|
'type': 'command_complete', |
|
'exit_code': self.current_process.returncode, |
|
'message': f'Process exited with code {self.current_process.returncode}' |
|
}) |
|
|
|
except Exception as e: |
|
await self.broadcast({ |
|
'type': 'error', |
|
'data': str(e), |
|
'stream': 'system' |
|
}) |
|
finally: |
|
self.current_process = None |
|
|
|
async def handle_client(self, websocket, path): |
|
"""Handle WebSocket client connections.""" |
|
await self.register_client(websocket) |
|
try: |
|
async for message in websocket: |
|
try: |
|
data = json.loads(message) |
|
|
|
if data.get('type') == 'command': |
|
command = data.get('command', '').strip() |
|
if command: |
|
await self.execute_command(command) |
|
|
|
elif data.get('type') == 'interrupt': |
|
if self.current_process: |
|
self.current_process.terminate() |
|
await self.broadcast({ |
|
'type': 'interrupted', |
|
'message': 'Process interrupted by user' |
|
}) |
|
|
|
except json.JSONDecodeError: |
|
await websocket.send(json.dumps({ |
|
'type': 'error', |
|
'message': 'Invalid JSON message' |
|
})) |
|
|
|
except websockets.exceptions.ConnectionClosed: |
|
pass |
|
finally: |
|
await self.unregister_client(websocket) |
|
|
|
|
|
terminal_manager = TerminalStreamManager() |
|
|
|
async def start_websocket_server(host='localhost', port=8765): |
|
"""Start the WebSocket server for terminal streaming.""" |
|
logger.info(f"Starting terminal WebSocket server on {host}:{port}") |
|
|
|
async def handler(websocket, path): |
|
await terminal_manager.handle_client(websocket, path) |
|
|
|
return await websockets.serve(handler, host, port) |
|
|
|
def run_websocket_server(): |
|
"""Run WebSocket server in a separate thread.""" |
|
def start_server(): |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
try: |
|
server = loop.run_until_complete(start_websocket_server()) |
|
logger.info("Terminal WebSocket server started successfully") |
|
loop.run_forever() |
|
except Exception as e: |
|
logger.error(f"Error starting WebSocket server: {e}") |
|
|
|
thread = threading.Thread(target=start_server, daemon=True) |
|
thread.start() |
|
return thread |