Spaces:
Runtime error
Runtime error
| from operator import itemgetter | |
| import os | |
| from typing import Any, Optional, Tuple, Dict, TypedDict | |
| from urllib import parse | |
| from uuid import uuid4 | |
| import colorlog | |
| import io | |
| import logging | |
| from pprint import pformat | |
| import socketio | |
| import sys | |
| import time | |
| import random | |
| import string | |
| from starlette.applications import Starlette | |
| from starlette.routing import Mount, Route | |
| from starlette.staticfiles import StaticFiles | |
| from src.room import Room, Member | |
| from src.simuleval_agent_directory import NoAvailableAgentException | |
| from src.simuleval_agent_directory import SimulevalAgentDirectory | |
| from src.simuleval_transcoder import SimulevalTranscoder | |
| from src.transcoder_helpers import get_transcoder_output_events | |
| ############################################### | |
| # Constants | |
| ############################################### | |
| DEBUG = True | |
| ALL_ROOM_ID = "ALL" | |
| ROOM_ID_USABLE_CHARACTERS = string.ascii_uppercase | |
| ROOM_ID_LENGTH = 4 | |
| ROOM_LISTENERS_SUFFIX = "_listeners" | |
| ROOM_SPEAKERS_SUFFIX = "_speakers" | |
| ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME = "remove_server_lock" | |
| ############################################### | |
| # Configure logger | |
| ############################################### | |
| logger = logging.getLogger("socketio_server_pubsub") | |
| logger.propagate = False | |
| handler = colorlog.StreamHandler(stream=sys.stdout) | |
| formatter = colorlog.ColoredFormatter( | |
| "%(log_color)s[%(asctime)s][%(levelname)s][%(module)s]:%(reset)s %(message)s", | |
| reset=True, | |
| log_colors={ | |
| "DEBUG": "cyan", | |
| "INFO": "green", | |
| "WARNING": "yellow", | |
| "ERROR": "red", | |
| "CRITICAL": "red,bg_white", | |
| }, | |
| ) | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.WARNING) | |
| print("") | |
| print("") | |
| print("=" * 20 + " ⭐️ Starting Server... ⭐️ " + "=" * 20) | |
| ############################################### | |
| # Configure socketio server | |
| ############################################### | |
| CLIENT_BUILD_PATH = "../streaming-react-app/dist/" | |
| static_files = { | |
| "/": CLIENT_BUILD_PATH, | |
| "/assets/seamless-db6a2555.svg": { | |
| "filename": CLIENT_BUILD_PATH + "assets/seamless-db6a2555.svg", | |
| "content_type": "image/svg+xml", | |
| }, | |
| } | |
| # sio is the main socket.io entrypoint | |
| sio = socketio.AsyncServer( | |
| async_mode="asgi", | |
| cors_allowed_origins="*", | |
| logger=logger, | |
| # engineio_logger=logger, | |
| ) | |
| # sio.logger.setLevel(logging.DEBUG) | |
| socketio_app = socketio.ASGIApp(sio) | |
| app_routes = [ | |
| Mount("/ws", app=socketio_app), # Mount Socket.IO server under /app | |
| Mount( | |
| "/", app=StaticFiles(directory=CLIENT_BUILD_PATH, html=True) | |
| ), # Serve static files from root | |
| ] | |
| app = Starlette(debug=True, routes=app_routes) | |
| # rooms is indexed by room_id | |
| rooms: Dict[str, Room] = {} | |
| class MemberDirectoryObject(TypedDict): | |
| room: Room | |
| member_object: Member | |
| # member_directory is indexed by client_id | |
| # NOTE: client_id is really "client session id", meaning that it is unique to a single browser session. | |
| # If a user opens a new tab, they will have a different client_id and can join another room, join | |
| # the same room with different roles, etc. | |
| # NOTE: For a long-running production server we would want to clean up members after a certain timeout | |
| # but for this limited application we can just keep them around | |
| member_directory: Dict[str, MemberDirectoryObject] = {} | |
| class ServerLock(TypedDict): | |
| name: str | |
| client_id: str | |
| member_object: Member | |
| server_lock: Optional[ServerLock] = None | |
| server_id = str(uuid4()) | |
| # Specify specific models to load (some environments have issues loading multiple models) | |
| # See AgentWithInfo with JSON format details. | |
| models_override = os.environ.get("MODELS_OVERRIDE") | |
| available_agents = SimulevalAgentDirectory() | |
| logger.info("Building and adding agents...") | |
| if models_override is not None: | |
| logger.info(f"MODELS_OVERRIDE supplied from env vars: {models_override}") | |
| available_agents.build_and_add_agents(models_override) | |
| agents_capabilities_for_json = available_agents.get_agents_capabilities_list_for_json() | |
| ############################################### | |
| # Helpers | |
| ############################################### | |
| def catch_and_log_exceptions_for_sio_event_handlers(func): | |
| # wrapper should have the same signature as the original function | |
| async def catch_exception_wrapper(*args, **kwargs): | |
| try: | |
| return await func(*args, **kwargs) | |
| except Exception as e: | |
| message = f"[app_pubsub] Caught exception in '{func.__name__}' event handler:\n\n{e}" | |
| logger.exception(message, stack_info=True) | |
| try: | |
| exception_data = { | |
| "message": message, | |
| "timeEpochMs": int(time.time() * 1000), | |
| } | |
| try: | |
| # Let's try to add as much useful metadata as possible to the server_exception event | |
| sid = args[0] | |
| if isinstance(sid, str) and len(sid) > 0: | |
| session_data = await get_session_data(sid) | |
| if session_data: | |
| client_id = session_data.get("client_id") | |
| member = session_data.get("member_object") | |
| room = session_data.get("room_object") | |
| exception_data["room"] = str(room) | |
| exception_data["member"] = str(member) | |
| exception_data["clientID"] = str(client_id) | |
| except Exception as inner_e: | |
| # We expect there will be times when clientID or other values aren't present, so just log this as a warning | |
| logger.warn( | |
| f"[app_pubsub] Caught exception while trying add additional_data to server_exception:\n\n{inner_e}" | |
| ) | |
| # For now let's emit this to all clients. We ultimatley may want to emit it just to the room it's happening in. | |
| await sio.emit("server_exception", exception_data) | |
| except Exception as inner_e: | |
| logger.exception( | |
| f"[app_pubsub] Caught exception while trying to emit server_exception event:\n{inner_e}" | |
| ) | |
| # Re-raise the exception so it's handled normally by the server | |
| raise e | |
| # Set the name of the wrapper to the name of the original function so that the socketio server can associate it with the right event | |
| catch_exception_wrapper.__name__ = func.__name__ | |
| return catch_exception_wrapper | |
| async def emit_room_state_update(room): | |
| await sio.emit( | |
| "room_state_update", | |
| room.to_json(), | |
| room=room.room_id, | |
| ) | |
| async def emit_server_state_update(): | |
| room_statuses = { | |
| room_id: room.get_room_status_dict() for room_id, room in rooms.items() | |
| } | |
| total_active_connections = sum( | |
| [room_status["activeConnections"] for room_status in room_statuses.values()] | |
| ) | |
| total_active_transcoders = sum( | |
| [room_status["activeTranscoders"] for room_status in room_statuses.values()] | |
| ) | |
| logger.info( | |
| f"[Server Status]: {total_active_connections} active connections (in rooms); {total_active_transcoders} active transcoders" | |
| ) | |
| logger.info(f"[Server Status]: server_lock={server_lock}") | |
| server_lock_object_for_js = ( | |
| { | |
| "name": server_lock.get("name"), | |
| "clientID": server_lock.get("client_id"), | |
| "isActive": server_lock.get("member_object") | |
| and server_lock.get("member_object").transcoder is not None, | |
| } | |
| if server_lock | |
| else None | |
| ) | |
| await sio.emit( | |
| "server_state_update", | |
| { | |
| "statusByRoom": room_statuses, | |
| "totalActiveConnections": total_active_connections, | |
| "totalActiveTranscoders": total_active_transcoders, | |
| "agentsCapabilities": agents_capabilities_for_json, | |
| "serverLock": server_lock_object_for_js, | |
| }, | |
| room=ALL_ROOM_ID, | |
| ) | |
| async def get_session_data(sid): | |
| session = await sio.get_session(sid) | |
| # It seems like if the session has not been set that get_session may return None, so let's provide a fallback empty dictionary here | |
| return session or {} | |
| async def set_session_data(sid, client_id, room_id, room_object, member_object): | |
| await sio.save_session( | |
| sid, | |
| { | |
| "client_id": client_id, | |
| "room_id": room_id, | |
| "room_object": room_object, | |
| "member_object": member_object, | |
| }, | |
| ) | |
| def get_random_room_id(): | |
| return "".join(random.choices(ROOM_ID_USABLE_CHARACTERS, k=ROOM_ID_LENGTH)) | |
| def get_random_unused_room_id(): | |
| room_id = get_random_room_id() | |
| while room_id in rooms: | |
| room_id = get_random_room_id() | |
| return room_id | |
| ############################################### | |
| # Socket.io Basic Event Handlers | |
| ############################################### | |
| async def connect(sid, environ): | |
| logger.info(f"📥 [event: connected] sid={sid}") | |
| # TODO: Sanitize/validate query param input | |
| query_params = dict(parse.parse_qsl(environ["QUERY_STRING"])) | |
| client_id = query_params.get("clientID") | |
| logger.debug(f"query_params:\n{pformat(query_params)}") | |
| if client_id is None: | |
| logger.info("No clientID provided. Disconnecting...") | |
| await sio.disconnect(sid) | |
| return | |
| # On reconnect we need to rejoin rooms and reset session data | |
| if member_directory.get(client_id): | |
| room = member_directory[client_id].get("room") | |
| room_id = room.room_id | |
| # Note: We could also get this from room.members[client_id] | |
| member = member_directory[client_id].get("member_object") | |
| member.connection_status = "connected" | |
| member.session_id = sid | |
| logger.info( | |
| f"[event: connect] {member} reconnected. Attempting to re-add them to socketio rooms and reset session data." | |
| ) | |
| if room is None or member is None: | |
| logger.error( | |
| f"[event: connect] {client_id} is reconnecting, but room or member is None. This should not happen." | |
| ) | |
| await sio.disconnect(sid) | |
| return | |
| sio.enter_room(sid, room_id) | |
| sio.enter_room(sid, ALL_ROOM_ID) | |
| if client_id in room.listeners: | |
| sio.enter_room(sid, f"{room_id}{ROOM_LISTENERS_SUFFIX}") | |
| if client_id in room.speakers: | |
| sio.enter_room(sid, f"{room_id}{ROOM_SPEAKERS_SUFFIX}") | |
| # Save the room_id to the socketio client session | |
| await set_session_data( | |
| sid, | |
| client_id=client_id, | |
| room_id=room.room_id, | |
| room_object=room, | |
| member_object=member, | |
| ) | |
| await emit_room_state_update(room) | |
| else: | |
| # Save the client id to the socketio client session | |
| await set_session_data( | |
| sid, client_id=client_id, room_id=None, room_object=None, member_object=None | |
| ) | |
| await sio.emit("server_id", server_id, to=sid) | |
| await emit_server_state_update() | |
| async def disconnect(sid): | |
| global server_lock | |
| session_data = await get_session_data(sid) | |
| # logger.info("session_data", session_data) | |
| client_id = None | |
| member = None | |
| room = None | |
| if session_data: | |
| client_id = session_data.get("client_id") | |
| member = session_data.get("member_object") | |
| room = session_data.get("room_object") | |
| logger.info( | |
| f"[event: disconnect][{room or 'NOT_IN_ROOM'}] member: {member or 'NO_MEMBER_OBJECT'} disconnected" | |
| ) | |
| # Release the lock if this is the client that holds the current server lock | |
| if server_lock and server_lock.get("client_id") == client_id: | |
| server_lock = None | |
| if member: | |
| member.connection_status = "disconnected" | |
| if member.transcoder: | |
| member.transcoder.close = True | |
| member.transcoder = None | |
| member.requested_output_type = None | |
| if room: | |
| logger.info( | |
| f"[event: disconnect] {member} disconnected from room {room.room_id}" | |
| ) | |
| await emit_room_state_update(room) | |
| else: | |
| logger.info( | |
| f"[event: disconnect] {member} disconnected, but no room object present. This should not happen." | |
| ) | |
| else: | |
| logger.info( | |
| f"[event: disconnect] client_id {client_id or 'NO_CLIENT_ID'} with sid {sid} in rooms {str(sio.rooms(sid))} disconnected" | |
| ) | |
| await emit_server_state_update() | |
| async def catch_all(event, sid, data): | |
| logger.info(f"[unhandled event: {event}] sid={sid} data={data}") | |
| ############################################### | |
| # Socket.io Streaming Event handlers | |
| ############################################### | |
| async def join_room(sid, client_id, room_id_from_client, config_dict): | |
| global server_lock | |
| args = { | |
| "sid": sid, | |
| "client_id": client_id, | |
| "room_id": room_id_from_client, | |
| "config_dict": config_dict, | |
| } | |
| logger.info(f"[event: join_room] {args}") | |
| session_data = await get_session_data(sid) | |
| logger.info(f"session_data: {session_data}") | |
| room_id = room_id_from_client | |
| if room_id is None: | |
| room_id = get_random_unused_room_id() | |
| logger.info( | |
| f"No room_id provided. Generating a random, unused room_id: {room_id}" | |
| ) | |
| # Create the room if it doesn't already exist | |
| if room_id not in rooms: | |
| rooms[room_id] = Room(room_id) | |
| room = rooms[room_id] | |
| member = None | |
| name = "[NO_NAME]" | |
| # If the client is reconnecting use their existing member object. Otherwise create a new one. | |
| if client_id in room.members: | |
| member = room.members[client_id] | |
| logger.info(f"{member} is rejoining room {room_id}.") | |
| else: | |
| member_number = len(room.members) + 1 | |
| name = f"Member {member_number}" | |
| member = Member( | |
| client_id=client_id, | |
| session_id=sid, | |
| name=name, | |
| ) | |
| logger.info(f"Created a new Member object: {member}") | |
| logger.info(f"Adding {member} to room {room_id}") | |
| room.members[client_id] = member | |
| # Also add them to the member directory | |
| member_directory[client_id] = {"room": room, "member_object": member} | |
| # Join the socketio room, which enables broadcasting to all members of the room | |
| sio.enter_room(sid, room_id) | |
| # Join the room for all clients | |
| sio.enter_room(sid, ALL_ROOM_ID) | |
| if "listener" in config_dict["roles"]: | |
| sio.enter_room(sid, f"{room_id}{ROOM_LISTENERS_SUFFIX}") | |
| if client_id not in room.listeners: | |
| room.listeners.append(client_id) | |
| else: | |
| sio.leave_room(sid, f"{room_id}{ROOM_LISTENERS_SUFFIX}") | |
| room.listeners = [ | |
| listener_id for listener_id in room.listeners if listener_id != client_id | |
| ] | |
| if "speaker" in config_dict["roles"]: | |
| sio.enter_room(sid, f"{room_id}{ROOM_SPEAKERS_SUFFIX}") | |
| if client_id not in room.speakers: | |
| room.speakers.append(client_id) | |
| else: | |
| sio.leave_room(sid, f"{room_id}{ROOM_SPEAKERS_SUFFIX}") | |
| # If the person is no longer a speaker they should no longer be able to lock the server | |
| if server_lock and server_lock.get("client_id") == client_id: | |
| logger.info( | |
| f"🔓 Server is now unlocked from client {server_lock.get('client_id')} with name/info: {server_lock.get('name')}" | |
| ) | |
| server_lock = None | |
| if member.transcoder: | |
| member.transcoder.close = True | |
| member.transcoder = None | |
| room.speakers = [ | |
| speaker_id for speaker_id in room.speakers if speaker_id != client_id | |
| ] | |
| # If we currently own the server lock and are updating roles and we no longer have server lock specified, release it | |
| if ( | |
| server_lock is not None | |
| and server_lock["client_id"] == client_id | |
| and config_dict.get("lockServerName") is None | |
| ): | |
| logger.info(f"[join_room] Releasing server lock: {pformat(server_lock)}") | |
| server_lock = None | |
| # Only speakers should be able to lock the server | |
| if config_dict.get("lockServerName") is not None and "speaker" in config_dict.get( | |
| "roles", {} | |
| ): | |
| # If something goes wrong and the server gets stuck in a locked state the client can | |
| # force the server to remove the lock by passing the special name ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME | |
| # TEMP: remove escape hatch for demo | |
| # if ( | |
| # server_lock is not None | |
| # and config_dict.get("lockServerName") | |
| # == ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME | |
| # ): | |
| # server_lock = None | |
| # logger.info( | |
| # f"🔓 Server lock has been reset by {client_id} using the escape hatch name {ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME}" | |
| # ) | |
| # If the server is not locked, set a lock. If it's already locked to this client, update the lock object | |
| if server_lock is None or server_lock.get("client_id") == client_id: | |
| # TODO: Add some sort of timeout as a backstop in case someone leaves the browser tab open after locking the server | |
| server_lock = { | |
| "name": config_dict.get("lockServerName"), | |
| "client_id": client_id, | |
| "member_object": member, | |
| } | |
| logger.info( | |
| f"🔒 Server is now locked to client {server_lock.get('client_id')} with name/info: {server_lock.get('name')}\nThis client will have priority over all others until they disconnect." | |
| ) | |
| # If the server is already locked to someone else, don't allow this client to lock it | |
| elif server_lock is not None and server_lock.get("client_id") != client_id: | |
| logger.warn( | |
| f"⚠️ Server is already locked to client {server_lock.get('client_id')}. Ignoring request to lock to client {client_id}." | |
| ) | |
| # TODO: Maybe throw an error here? | |
| # Save the room_id to the socketio client session | |
| await set_session_data( | |
| sid, | |
| client_id=client_id, | |
| room_id=room_id, | |
| room_object=room, | |
| member_object=member, | |
| ) | |
| await emit_room_state_update(room) | |
| await emit_server_state_update() | |
| return {"roomsJoined": sio.rooms(sid), "roomID": room_id} | |
| # TODO: Add code to prevent more than one speaker from connecting/streaming at a time | |
| async def configure_stream(sid, config): | |
| session_data = await get_session_data(sid) | |
| client_id, member, room = itemgetter("client_id", "member_object", "room_object")( | |
| session_data | |
| ) | |
| logger.debug( | |
| f"[event: configure_stream][{room}] Received stream config from {member}\n{pformat(config)}" | |
| ) | |
| if member is None or room is None: | |
| logger.error( | |
| f"Received stream config from {member}, but member or room is None. This should not happen." | |
| ) | |
| return {"status": "error", "message": "member_or_room_is_none"} | |
| # If there is a server lock WITH an active transcoder session, prevent other users from configuring and starting a stream | |
| # If the server lock client does NOT have an active transcoder session allow this to proceed, knowing that | |
| # this stream will be interrupted if the server lock client starts streaming | |
| if ( | |
| server_lock is not None | |
| and server_lock.get("client_id") != client_id | |
| and server_lock.get("member_object") | |
| and server_lock.get("member_object").transcoder is not None | |
| ): | |
| logger.warn( | |
| f"Server is locked to client {server_lock.get('client_id')}. Ignoring request to configure stream from client {client_id}." | |
| ) | |
| return {"status": "error", "message": "server_locked"} | |
| debug = config.get("debug") | |
| async_processing = config.get("async_processing") | |
| # Currently s2s, s2t or s2s&t | |
| model_type = config.get("model_type") | |
| member.requested_output_type = model_type | |
| model_name = config.get("model_name") | |
| try: | |
| agent = available_agents.get_agent_or_throw(model_name) | |
| except NoAvailableAgentException as e: | |
| logger.warn(f"Error while getting agent: {e}") | |
| # await sio.emit("error", str(e), to=sid) | |
| await sio.disconnect(sid) | |
| return {"status": "error", "message": str(e)} | |
| if member.transcoder: | |
| logger.warn( | |
| "Member already has a transcoder configured. Closing it, and overwriting with a new transcoder..." | |
| ) | |
| member.transcoder.close = True | |
| t0 = time.time() | |
| try: | |
| member.transcoder = SimulevalTranscoder( | |
| agent, | |
| config["rate"], | |
| debug=debug, | |
| buffer_limit=int(config["buffer_limit"]), | |
| ) | |
| except Exception as e: | |
| logger.warn(f"Got exception while initializing agents: {e}") | |
| # await sio.emit("error", str(e), to=sid) | |
| await sio.disconnect(sid) | |
| return {"status": "error", "message": str(e)} | |
| t1 = time.time() | |
| logger.debug(f"Booting up VAD and transcoder took {t1-t0} sec") | |
| # TODO: if async_processing is false, then we need to run transcoder.process_pipeline_once() whenever we receive audio, or at some other sensible interval | |
| if async_processing: | |
| member.transcoder.start() | |
| # We need to emit a room state update here since room state now includes # of active transcoders | |
| await emit_room_state_update(room) | |
| await emit_server_state_update() | |
| return {"status": "ok", "message": "server_ready"} | |
| # The config here is a partial config, meaning it may not contain all the config values -- only the ones the user | |
| # wants to change | |
| async def set_dynamic_config( | |
| sid, | |
| # partial_config's type is defined in StreamingTypes.ts | |
| partial_config, | |
| ): | |
| session_data = await get_session_data(sid) | |
| member = None | |
| if session_data: | |
| member = session_data.get("member_object") | |
| if member: | |
| new_dynamic_config = { | |
| **(member.transcoder_dynamic_config or {}), | |
| **partial_config, | |
| } | |
| logger.info( | |
| f"[set_dynamic_config] Setting new dynamic config:\n\n{pformat(new_dynamic_config)}\n" | |
| ) | |
| member.transcoder_dynamic_config = new_dynamic_config | |
| return {"status": "ok", "message": "dynamic_config_set"} | |
| async def incoming_audio(sid, blob): | |
| session_data = await get_session_data(sid) | |
| client_id = None | |
| member = None | |
| room = None | |
| if session_data: | |
| client_id = session_data.get("client_id") | |
| member = session_data.get("member_object") | |
| room = session_data.get("room_object") | |
| logger.debug(f"[event: incoming_audio] from member {member}") | |
| # If the server is locked by someone else, kill our transcoder and ignore incoming audio | |
| # If the server lock client does NOT have an active transcoder session allow this incoming audio pipeline to proceed, | |
| # knowing that this stream will be interrupted if the server lock client starts streaming | |
| if ( | |
| server_lock is not None | |
| and server_lock.get("client_id") != client_id | |
| and server_lock.get("member_object") | |
| and server_lock.get("member_object").transcoder is not None | |
| ): | |
| # TODO: Send an event to the client to let them know their streaming session has been killed | |
| if member.transcoder: | |
| member.transcoder.close = True | |
| member.transcoder = None | |
| # Update both room state and server state given that the number of active transcoders has changed | |
| if room: | |
| await emit_room_state_update(room) | |
| await emit_server_state_update() | |
| logger.warn( | |
| f"[incoming_audio] Server is locked to client {server_lock.get('client_id')}. Ignoring incoming audio from client {client_id}." | |
| ) | |
| return | |
| if member is None or room is None: | |
| logger.error( | |
| f"[incoming_audio] Received incoming_audio from {member}, but member or room is None. This should not happen." | |
| ) | |
| return | |
| # NOTE: bytes and bytearray are very similar, but bytes is immutable, and is what is returned by socketio | |
| if not isinstance(blob, bytes): | |
| logger.error( | |
| f"[incoming_audio] Received audio from {member}, but it was not of type `bytes`. type(blob) = {type(blob)}" | |
| ) | |
| return | |
| if member.transcoder is None: | |
| logger.error( | |
| f"[incoming_audio] Received audio from {member}, but no transcoder configured to process it (member.transcoder is None). This should not happen." | |
| ) | |
| return | |
| member.transcoder.process_incoming_bytes( | |
| blob, dynamic_config=member.transcoder_dynamic_config | |
| ) | |
| # Send back any available model output | |
| # NOTE: In theory it would make sense remove this from the incoming_audio handler and | |
| # handle this in a dedicated thread that checks for output and sends it right away, | |
| # but in practice for our limited demo use cases this approach didn't add noticeable | |
| # latency, so we're keeping it simple for now. | |
| events = get_transcoder_output_events(member.transcoder) | |
| logger.debug(f"[incoming_audio] transcoder output events: {len(events)}") | |
| if len(events) == 0: | |
| logger.debug("[incoming_audio] No transcoder output to send") | |
| else: | |
| for e in events: | |
| if e["event"] == "translation_speech" and member.requested_output_type in [ | |
| "s2s", | |
| "s2s&t", | |
| ]: | |
| logger.debug("[incoming_audio] Sending translation_speech event") | |
| await sio.emit( | |
| "translation_speech", e, room=f"{room.room_id}_listeners" | |
| ) | |
| elif e["event"] == "translation_text" and member.requested_output_type in [ | |
| "s2t", | |
| "s2s&t", | |
| ]: | |
| logger.debug("[incoming_audio] Sending translation_text event") | |
| await sio.emit("translation_text", e, room=f"{room.room_id}_listeners") | |
| else: | |
| logger.error(f"[incoming_audio] Unexpected event type: {e['event']}") | |
| return | |
| async def stop_stream(sid): | |
| session_data = await get_session_data(sid) | |
| client_id, member, room = itemgetter("client_id", "member_object", "room_object")( | |
| session_data | |
| ) | |
| logger.debug(f"[event: stop_stream][{room}] Attempting to stop stream for {member}") | |
| if member is None or room is None: | |
| message = f"Received stop_stream from {member}, but member or room is None. This should not happen." | |
| logger.error(message) | |
| return {"status": "error", "message": message} | |
| # In order to stop the stream and end the transcoder thread, set close to True and unset it for the member | |
| if member.transcoder: | |
| member.transcoder.close = True | |
| member.transcoder = None | |
| else: | |
| message = f"Received stop_stream from {member}, but member.transcoder is None. This should not happen." | |
| logger.warn(message) | |
| # We need to emit a room state update here since room state now includes # of active transcoders | |
| await emit_room_state_update(room) | |
| # Emit a server state update now that we've changed the number of active transcoders | |
| await emit_server_state_update() | |
| return {"status": "ok", "message": "Stream stopped"} | |
| async def clear_transcript_for_all(sid): | |
| session_data = await get_session_data(sid) | |
| room = session_data.get("room_object") | |
| if room: | |
| await sio.emit("clear_transcript", room=f"{room.room_id}") | |
| else: | |
| logger.error("[clear_transcript] room is None. This should not happen.") | |
| async def set_name(sid, name): | |
| logger.info(f"[Event: set_name] name={name}") | |
| await sio.save_session(sid, {"name": name}) | |