Spaces:
Running
Running
| import type { | |
| ConsumerSensorDriver, | |
| ConnectionStatus, | |
| SensorFrame, | |
| SensorStream, | |
| FrameCallback, | |
| StreamUpdateCallback, | |
| StatusChangeCallback, | |
| UnsubscribeFn | |
| } from "../types/index.js"; | |
| export interface WebRTCConsumerConfig { | |
| type: "webrtc-consumer"; | |
| signalingUrl: string; // ws://host:port/signaling | |
| streamId?: string; | |
| } | |
| export class WebRTCConsumer implements ConsumerSensorDriver { | |
| readonly type = "consumer" as const; | |
| readonly id: string; | |
| readonly name = "WebRTC Consumer"; | |
| private config: WebRTCConsumerConfig; | |
| private _status: ConnectionStatus = { isConnected: false }; | |
| private pc: RTCPeerConnection | null = null; | |
| private dc: RTCDataChannel | null = null; | |
| private signaling?: WebSocket; | |
| private frameSentCallbacks: FrameCallback[] = []; | |
| private streamUpdateCallbacks: StreamUpdateCallback[] = []; | |
| private statusCallbacks: StatusChangeCallback[] = []; | |
| private activeStreams = new Map<string, SensorStream>(); | |
| private sendQueue: SensorFrame[] = []; | |
| private isSending = false; | |
| private readonly BUFFER_WATERMARK = 4 * 1024 * 1024; // 4 MB | |
| constructor(config: WebRTCConsumerConfig) { | |
| this.config = config; | |
| this.id = `webrtc-consumer-${Date.now()}`; | |
| } | |
| get status(): ConnectionStatus { | |
| return this._status; | |
| } | |
| async connect(): Promise<void> { | |
| // open signaling | |
| this.signaling = new WebSocket(this.config.signalingUrl); | |
| await new Promise<void>((res, rej) => { | |
| this.signaling!.onopen = () => res(); | |
| this.signaling!.onerror = rej; | |
| }); | |
| // create pc | |
| this.pc = new RTCPeerConnection({ | |
| iceServers: [{ urls: "stun:stun.l.google.com:19302" }] | |
| }); | |
| // datachannel | |
| this.dc = this.pc.createDataChannel("video", { | |
| ordered: false, | |
| maxRetransmits: 0 | |
| }); | |
| this.dc.binaryType = "arraybuffer"; | |
| this.dc.onopen = () => { | |
| this._status = { isConnected: true, lastConnected: new Date() }; | |
| this.notifyStatus(); | |
| this.flushQueue(); | |
| }; | |
| this.dc.onclose = () => { | |
| this._status = { isConnected: false, error: "DC closed" }; | |
| this.notifyStatus(); | |
| }; | |
| // ICE - Trickle-ICE for faster startup | |
| this.pc.onicecandidate = (ev) => { | |
| if (ev.candidate) { | |
| this.signaling!.send(JSON.stringify({ | |
| type: "ice", | |
| candidate: ev.candidate.toJSON() | |
| })); | |
| } else { | |
| // Send end-of-candidates marker | |
| this.signaling!.send(JSON.stringify({ | |
| type: "ice", | |
| candidate: { end: true } | |
| })); | |
| } | |
| }; | |
| // signaling messages | |
| this.signaling.onmessage = async (ev) => { | |
| const msg = JSON.parse(ev.data); | |
| if (msg.type === "answer") { | |
| await this.pc!.setRemoteDescription({ type: "answer", sdp: msg.sdp }); | |
| } else if (msg.type === "ice") { | |
| // Handle end-of-candidates marker | |
| if (msg.candidate?.end) { | |
| // ICE gathering complete on remote side | |
| return; | |
| } | |
| await this.pc!.addIceCandidate(msg.candidate); | |
| } | |
| }; | |
| // create offer immediately (trickle-ICE) | |
| const offer = await this.pc.createOffer(); | |
| await this.pc.setLocalDescription(offer); | |
| this.signaling.send(JSON.stringify({ type: "offer", sdp: offer.sdp })); | |
| } | |
| async disconnect(): Promise<void> { | |
| if (this.dc) this.dc.close(); | |
| if (this.pc) this.pc.close(); | |
| if (this.signaling) this.signaling.close(); | |
| this._status = { isConnected: false }; | |
| this.notifyStatus(); | |
| } | |
| // ConsumerSensorDriver impl | |
| async sendFrame(frame: SensorFrame): Promise<void> { | |
| if (!this.dc || this.dc.readyState !== "open") { | |
| throw new Error("DataChannel not open"); | |
| } | |
| this.sendQueue.push(frame); | |
| this.flushQueue(); | |
| } | |
| async sendFrames(frames: SensorFrame[]): Promise<void> { | |
| this.sendQueue.push(...frames); | |
| this.flushQueue(); | |
| } | |
| async startOutputStream(stream: SensorStream): Promise<void> { | |
| this.activeStreams.set(stream.id, stream); | |
| this.notifyStream(stream); | |
| } | |
| async stopOutputStream(streamId: string): Promise<void> { | |
| const s = this.activeStreams.get(streamId); | |
| if (s) { | |
| s.active = false; this.activeStreams.delete(streamId); this.notifyStream(s); | |
| } | |
| } | |
| getActiveOutputStreams(): SensorStream[] { return Array.from(this.activeStreams.values()); } | |
| // no-op for onFrameSent etc. | |
| onFrameSent(cb: FrameCallback): UnsubscribeFn { this.frameSentCallbacks.push(cb); return () => this.pull(this.frameSentCallbacks, cb); } | |
| onStreamUpdate(cb: StreamUpdateCallback): UnsubscribeFn { this.streamUpdateCallbacks.push(cb); return () => this.pull(this.streamUpdateCallbacks, cb); } | |
| onStatusChange(cb: StatusChangeCallback): UnsubscribeFn { this.statusCallbacks.push(cb); return () => this.pull(this.statusCallbacks, cb); } | |
| // helpers | |
| private flushQueue() { | |
| if (!this.dc || this.dc.readyState !== "open") return; | |
| while (this.sendQueue.length && this.dc.bufferedAmount < this.BUFFER_WATERMARK) { | |
| const frame = this.sendQueue.shift()!; | |
| const packet = this.frameToPacket(frame); | |
| this.dc.send(packet); | |
| this.frameSentCallbacks.forEach((c) => c(frame)); | |
| } | |
| } | |
| private frameToPacket(frame: SensorFrame): ArrayBuffer { | |
| const headerObj = { | |
| type: "video_frame", | |
| timestamp: frame.timestamp, | |
| frameType: frame.type, | |
| metadata: frame.metadata, | |
| streamId: this.config.streamId | |
| }; | |
| const headerJson = JSON.stringify(headerObj); | |
| const headerBuf = new TextEncoder().encode(headerJson); | |
| const lenBuf = new Uint32Array([headerBuf.length]).buffer; | |
| let dataBuf: ArrayBuffer; | |
| if (frame.data instanceof Blob) { | |
| // this is sync because MediaRecorder gives Blob slices pre-gathered | |
| // but we need async arrayBuffer — already spec returns Promise | |
| return frame.data.arrayBuffer().then((buf) => { | |
| const out = new Uint8Array(lenBuf.byteLength + headerBuf.length + buf.byteLength); | |
| out.set(new Uint8Array(lenBuf), 0); | |
| out.set(headerBuf, lenBuf.byteLength); | |
| out.set(new Uint8Array(buf), lenBuf.byteLength + headerBuf.length); | |
| return out.buffer; | |
| }) as unknown as ArrayBuffer; // caller handles async in flushQueue loop by awaiting? For now assume ArrayBuffer path (MediaRecorder provides ArrayBuffer in config) | |
| } else { | |
| dataBuf = frame.data as ArrayBuffer; | |
| } | |
| const out = new Uint8Array(lenBuf.byteLength + headerBuf.length + dataBuf.byteLength); | |
| out.set(new Uint8Array(lenBuf), 0); | |
| out.set(headerBuf, lenBuf.byteLength); | |
| out.set(new Uint8Array(dataBuf), lenBuf.byteLength + headerBuf.length); | |
| return out.buffer; | |
| } | |
| private pull<T>(arr: T[], item: T) { const i = arr.indexOf(item); if (i >= 0) arr.splice(i, 1); } | |
| private notifyStream(s: SensorStream) { this.streamUpdateCallbacks.forEach((c) => c(s)); } | |
| private notifyStatus() { this.statusCallbacks.forEach((c) => c(this._status)); } | |
| } |