uses sentinel object when end of transcription, to properly terminate tasks
Browse files- whisperlivekit/audio_processor.py +154 -51
whisperlivekit/audio_processor.py
CHANGED
|
@@ -15,6 +15,8 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(
|
|
| 15 |
logger = logging.getLogger(__name__)
|
| 16 |
logger.setLevel(logging.DEBUG)
|
| 17 |
|
|
|
|
|
|
|
| 18 |
def format_time(seconds: float) -> str:
|
| 19 |
"""Format seconds as HH:MM:SS."""
|
| 20 |
return str(timedelta(seconds=int(seconds)))
|
|
@@ -41,8 +43,9 @@ class AudioProcessor:
|
|
| 41 |
self.last_ffmpeg_activity = time()
|
| 42 |
self.ffmpeg_health_check_interval = 5
|
| 43 |
self.ffmpeg_max_idle_time = 10
|
| 44 |
-
|
| 45 |
# State management
|
|
|
|
| 46 |
self.tokens = []
|
| 47 |
self.buffer_transcription = ""
|
| 48 |
self.buffer_diarization = ""
|
|
@@ -62,6 +65,13 @@ class AudioProcessor:
|
|
| 62 |
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
|
| 63 |
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
|
| 64 |
self.pcm_buffer = bytearray()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 65 |
|
| 66 |
# Initialize transcription engine if enabled
|
| 67 |
if self.args.transcription:
|
|
@@ -210,7 +220,7 @@ class AudioProcessor:
|
|
| 210 |
self.last_ffmpeg_activity = time()
|
| 211 |
|
| 212 |
if not chunk:
|
| 213 |
-
logger.info("FFmpeg stdout closed.")
|
| 214 |
break
|
| 215 |
|
| 216 |
self.pcm_buffer.extend(chunk)
|
|
@@ -245,6 +255,15 @@ class AudioProcessor:
|
|
| 245 |
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
|
| 246 |
logger.warning(f"Traceback: {traceback.format_exc()}")
|
| 247 |
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 248 |
|
| 249 |
async def transcription_processor(self):
|
| 250 |
"""Process audio chunks for transcription."""
|
|
@@ -254,7 +273,16 @@ class AudioProcessor:
|
|
| 254 |
while True:
|
| 255 |
try:
|
| 256 |
pcm_array = await self.transcription_queue.get()
|
|
|
|
|
|
|
|
|
|
|
|
|
| 257 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 258 |
logger.info(f"{len(self.online.audio_buffer) / self.online.SAMPLING_RATE} seconds of audio to process.")
|
| 259 |
|
| 260 |
# Process transcription
|
|
@@ -278,12 +306,15 @@ class AudioProcessor:
|
|
| 278 |
await self.update_transcription(
|
| 279 |
new_tokens, buffer, end_buffer, self.full_transcription, self.sep
|
| 280 |
)
|
|
|
|
| 281 |
|
| 282 |
except Exception as e:
|
| 283 |
logger.warning(f"Exception in transcription_processor: {e}")
|
| 284 |
logger.warning(f"Traceback: {traceback.format_exc()}")
|
| 285 |
-
|
| 286 |
-
|
|
|
|
|
|
|
| 287 |
|
| 288 |
async def diarization_processor(self, diarization_obj):
|
| 289 |
"""Process audio chunks for speaker diarization."""
|
|
@@ -292,6 +323,10 @@ class AudioProcessor:
|
|
| 292 |
while True:
|
| 293 |
try:
|
| 294 |
pcm_array = await self.diarization_queue.get()
|
|
|
|
|
|
|
|
|
|
|
|
|
| 295 |
|
| 296 |
# Process diarization
|
| 297 |
await diarization_obj.diarize(pcm_array)
|
|
@@ -303,12 +338,15 @@ class AudioProcessor:
|
|
| 303 |
)
|
| 304 |
|
| 305 |
await self.update_diarization(new_end, buffer_diarization)
|
|
|
|
| 306 |
|
| 307 |
except Exception as e:
|
| 308 |
logger.warning(f"Exception in diarization_processor: {e}")
|
| 309 |
logger.warning(f"Traceback: {traceback.format_exc()}")
|
| 310 |
-
|
| 311 |
-
|
|
|
|
|
|
|
| 312 |
|
| 313 |
async def results_formatter(self):
|
| 314 |
"""Format processing results for output."""
|
|
@@ -398,6 +436,19 @@ class AudioProcessor:
|
|
| 398 |
yield response
|
| 399 |
self.last_response_content = response_content
|
| 400 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 401 |
await asyncio.sleep(0.1) # Avoid overwhelming the client
|
| 402 |
|
| 403 |
except Exception as e:
|
|
@@ -407,65 +458,117 @@ class AudioProcessor:
|
|
| 407 |
|
| 408 |
async def create_tasks(self):
|
| 409 |
"""Create and start processing tasks."""
|
| 410 |
-
|
| 411 |
-
|
|
|
|
| 412 |
if self.args.transcription and self.online:
|
| 413 |
-
|
|
|
|
|
|
|
| 414 |
|
| 415 |
if self.args.diarization and self.diarization:
|
| 416 |
-
|
| 417 |
-
|
| 418 |
-
|
| 419 |
|
| 420 |
-
|
| 421 |
-
|
| 422 |
-
|
| 423 |
-
try:
|
| 424 |
-
await asyncio.sleep(10) # Check every 10 seconds instead of 60
|
| 425 |
-
|
| 426 |
-
current_time = time()
|
| 427 |
-
# Check for stalled tasks
|
| 428 |
-
for i, task in enumerate(tasks):
|
| 429 |
-
if task.done():
|
| 430 |
-
exc = task.exception() if task.done() else None
|
| 431 |
-
task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}"
|
| 432 |
-
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
|
| 433 |
-
|
| 434 |
-
# Check for FFmpeg process health with shorter thresholds
|
| 435 |
-
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
|
| 436 |
-
if ffmpeg_idle_time > 15: # 15 seconds instead of 180
|
| 437 |
-
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention")
|
| 438 |
-
|
| 439 |
-
# Force restart after 30 seconds of inactivity (instead of 600)
|
| 440 |
-
if ffmpeg_idle_time > 30:
|
| 441 |
-
logger.error("FFmpeg idle for too long, forcing restart")
|
| 442 |
-
await self.restart_ffmpeg()
|
| 443 |
-
|
| 444 |
-
except Exception as e:
|
| 445 |
-
logger.error(f"Error in watchdog task: {e}")
|
| 446 |
|
| 447 |
-
|
| 448 |
-
self.
|
|
|
|
| 449 |
|
| 450 |
return self.results_formatter()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 451 |
|
| 452 |
async def cleanup(self):
|
| 453 |
"""Clean up resources when processing is complete."""
|
| 454 |
-
|
| 455 |
-
|
| 456 |
-
|
| 457 |
-
|
| 458 |
-
|
| 459 |
-
|
| 460 |
-
|
| 461 |
-
|
| 462 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 463 |
|
| 464 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 465 |
self.diarization.close()
|
|
|
|
|
|
|
| 466 |
|
| 467 |
async def process_audio(self, message):
|
| 468 |
"""Process incoming audio data."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 469 |
retry_count = 0
|
| 470 |
max_retries = 3
|
| 471 |
|
|
@@ -517,4 +620,4 @@ class AudioProcessor:
|
|
| 517 |
else:
|
| 518 |
logger.error("Maximum retries reached for FFmpeg process")
|
| 519 |
await self.restart_ffmpeg()
|
| 520 |
-
return
|
|
|
|
| 15 |
logger = logging.getLogger(__name__)
|
| 16 |
logger.setLevel(logging.DEBUG)
|
| 17 |
|
| 18 |
+
SENTINEL = object() # unique sentinel object for end of stream marker
|
| 19 |
+
|
| 20 |
def format_time(seconds: float) -> str:
|
| 21 |
"""Format seconds as HH:MM:SS."""
|
| 22 |
return str(timedelta(seconds=int(seconds)))
|
|
|
|
| 43 |
self.last_ffmpeg_activity = time()
|
| 44 |
self.ffmpeg_health_check_interval = 5
|
| 45 |
self.ffmpeg_max_idle_time = 10
|
| 46 |
+
|
| 47 |
# State management
|
| 48 |
+
self.is_stopping = False
|
| 49 |
self.tokens = []
|
| 50 |
self.buffer_transcription = ""
|
| 51 |
self.buffer_diarization = ""
|
|
|
|
| 65 |
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
|
| 66 |
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
|
| 67 |
self.pcm_buffer = bytearray()
|
| 68 |
+
|
| 69 |
+
# Task references
|
| 70 |
+
self.transcription_task = None
|
| 71 |
+
self.diarization_task = None
|
| 72 |
+
self.ffmpeg_reader_task = None
|
| 73 |
+
self.watchdog_task = None
|
| 74 |
+
self.all_tasks_for_cleanup = []
|
| 75 |
|
| 76 |
# Initialize transcription engine if enabled
|
| 77 |
if self.args.transcription:
|
|
|
|
| 220 |
self.last_ffmpeg_activity = time()
|
| 221 |
|
| 222 |
if not chunk:
|
| 223 |
+
logger.info("FFmpeg stdout closed, no more data to read.")
|
| 224 |
break
|
| 225 |
|
| 226 |
self.pcm_buffer.extend(chunk)
|
|
|
|
| 255 |
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
|
| 256 |
logger.warning(f"Traceback: {traceback.format_exc()}")
|
| 257 |
break
|
| 258 |
+
|
| 259 |
+
logger.info("FFmpeg stdout processing finished. Signaling downstream processors.")
|
| 260 |
+
if self.args.transcription and self.transcription_queue:
|
| 261 |
+
await self.transcription_queue.put(SENTINEL)
|
| 262 |
+
logger.debug("Sentinel put into transcription_queue.")
|
| 263 |
+
if self.args.diarization and self.diarization_queue:
|
| 264 |
+
await self.diarization_queue.put(SENTINEL)
|
| 265 |
+
logger.debug("Sentinel put into diarization_queue.")
|
| 266 |
+
|
| 267 |
|
| 268 |
async def transcription_processor(self):
|
| 269 |
"""Process audio chunks for transcription."""
|
|
|
|
| 273 |
while True:
|
| 274 |
try:
|
| 275 |
pcm_array = await self.transcription_queue.get()
|
| 276 |
+
if pcm_array is SENTINEL:
|
| 277 |
+
logger.debug("Transcription processor received sentinel. Finishing.")
|
| 278 |
+
self.transcription_queue.task_done()
|
| 279 |
+
break
|
| 280 |
|
| 281 |
+
if not self.online: # Should not happen if queue is used
|
| 282 |
+
logger.warning("Transcription processor: self.online not initialized.")
|
| 283 |
+
self.transcription_queue.task_done()
|
| 284 |
+
continue
|
| 285 |
+
|
| 286 |
logger.info(f"{len(self.online.audio_buffer) / self.online.SAMPLING_RATE} seconds of audio to process.")
|
| 287 |
|
| 288 |
# Process transcription
|
|
|
|
| 306 |
await self.update_transcription(
|
| 307 |
new_tokens, buffer, end_buffer, self.full_transcription, self.sep
|
| 308 |
)
|
| 309 |
+
self.transcription_queue.task_done()
|
| 310 |
|
| 311 |
except Exception as e:
|
| 312 |
logger.warning(f"Exception in transcription_processor: {e}")
|
| 313 |
logger.warning(f"Traceback: {traceback.format_exc()}")
|
| 314 |
+
if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue
|
| 315 |
+
self.transcription_queue.task_done()
|
| 316 |
+
logger.info("Transcription processor task finished.")
|
| 317 |
+
|
| 318 |
|
| 319 |
async def diarization_processor(self, diarization_obj):
|
| 320 |
"""Process audio chunks for speaker diarization."""
|
|
|
|
| 323 |
while True:
|
| 324 |
try:
|
| 325 |
pcm_array = await self.diarization_queue.get()
|
| 326 |
+
if pcm_array is SENTINEL:
|
| 327 |
+
logger.debug("Diarization processor received sentinel. Finishing.")
|
| 328 |
+
self.diarization_queue.task_done()
|
| 329 |
+
break
|
| 330 |
|
| 331 |
# Process diarization
|
| 332 |
await diarization_obj.diarize(pcm_array)
|
|
|
|
| 338 |
)
|
| 339 |
|
| 340 |
await self.update_diarization(new_end, buffer_diarization)
|
| 341 |
+
self.diarization_queue.task_done()
|
| 342 |
|
| 343 |
except Exception as e:
|
| 344 |
logger.warning(f"Exception in diarization_processor: {e}")
|
| 345 |
logger.warning(f"Traceback: {traceback.format_exc()}")
|
| 346 |
+
if 'pcm_array' in locals() and pcm_array is not SENTINEL:
|
| 347 |
+
self.diarization_queue.task_done()
|
| 348 |
+
logger.info("Diarization processor task finished.")
|
| 349 |
+
|
| 350 |
|
| 351 |
async def results_formatter(self):
|
| 352 |
"""Format processing results for output."""
|
|
|
|
| 436 |
yield response
|
| 437 |
self.last_response_content = response_content
|
| 438 |
|
| 439 |
+
# Check for termination condition
|
| 440 |
+
if self.is_stopping:
|
| 441 |
+
all_processors_done = True
|
| 442 |
+
if self.args.transcription and self.transcription_task and not self.transcription_task.done():
|
| 443 |
+
all_processors_done = False
|
| 444 |
+
if self.args.diarization and self.diarization_task and not self.diarization_task.done():
|
| 445 |
+
all_processors_done = False
|
| 446 |
+
|
| 447 |
+
if all_processors_done:
|
| 448 |
+
logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.")
|
| 449 |
+
final_state = await self.get_current_state()
|
| 450 |
+
return
|
| 451 |
+
|
| 452 |
await asyncio.sleep(0.1) # Avoid overwhelming the client
|
| 453 |
|
| 454 |
except Exception as e:
|
|
|
|
| 458 |
|
| 459 |
async def create_tasks(self):
|
| 460 |
"""Create and start processing tasks."""
|
| 461 |
+
self.all_tasks_for_cleanup = []
|
| 462 |
+
processing_tasks_for_watchdog = []
|
| 463 |
+
|
| 464 |
if self.args.transcription and self.online:
|
| 465 |
+
self.transcription_task = asyncio.create_task(self.transcription_processor())
|
| 466 |
+
self.all_tasks_for_cleanup.append(self.transcription_task)
|
| 467 |
+
processing_tasks_for_watchdog.append(self.transcription_task)
|
| 468 |
|
| 469 |
if self.args.diarization and self.diarization:
|
| 470 |
+
self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization))
|
| 471 |
+
self.all_tasks_for_cleanup.append(self.diarization_task)
|
| 472 |
+
processing_tasks_for_watchdog.append(self.diarization_task)
|
| 473 |
|
| 474 |
+
self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
|
| 475 |
+
self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task)
|
| 476 |
+
processing_tasks_for_watchdog.append(self.ffmpeg_reader_task)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 477 |
|
| 478 |
+
# Monitor overall system health
|
| 479 |
+
self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog))
|
| 480 |
+
self.all_tasks_for_cleanup.append(self.watchdog_task)
|
| 481 |
|
| 482 |
return self.results_formatter()
|
| 483 |
+
|
| 484 |
+
async def watchdog(self, tasks_to_monitor):
|
| 485 |
+
"""Monitors the health of critical processing tasks."""
|
| 486 |
+
while True:
|
| 487 |
+
try:
|
| 488 |
+
await asyncio.sleep(10)
|
| 489 |
+
current_time = time()
|
| 490 |
+
|
| 491 |
+
for i, task in enumerate(tasks_to_monitor):
|
| 492 |
+
if task.done():
|
| 493 |
+
exc = task.exception()
|
| 494 |
+
task_name = task.get_name() if hasattr(task, 'get_name') else f"Monitored Task {i}"
|
| 495 |
+
if exc:
|
| 496 |
+
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
|
| 497 |
+
else:
|
| 498 |
+
logger.info(f"{task_name} completed normally.")
|
| 499 |
+
|
| 500 |
+
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
|
| 501 |
+
if ffmpeg_idle_time > 15:
|
| 502 |
+
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.")
|
| 503 |
+
if ffmpeg_idle_time > 30 and not self.is_stopping:
|
| 504 |
+
logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.")
|
| 505 |
+
await self.restart_ffmpeg()
|
| 506 |
+
except asyncio.CancelledError:
|
| 507 |
+
logger.info("Watchdog task cancelled.")
|
| 508 |
+
break
|
| 509 |
+
except Exception as e:
|
| 510 |
+
logger.error(f"Error in watchdog task: {e}", exc_info=True)
|
| 511 |
|
| 512 |
async def cleanup(self):
|
| 513 |
"""Clean up resources when processing is complete."""
|
| 514 |
+
logger.info("Starting cleanup of AudioProcessor resources.")
|
| 515 |
+
for task in self.all_tasks_for_cleanup:
|
| 516 |
+
if task and not task.done():
|
| 517 |
+
task.cancel()
|
| 518 |
+
|
| 519 |
+
created_tasks = [t for t in self.all_tasks_for_cleanup if t]
|
| 520 |
+
if created_tasks:
|
| 521 |
+
await asyncio.gather(*created_tasks, return_exceptions=True)
|
| 522 |
+
logger.info("All processing tasks cancelled or finished.")
|
| 523 |
+
|
| 524 |
+
if self.ffmpeg_process:
|
| 525 |
+
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
| 526 |
+
try:
|
| 527 |
+
self.ffmpeg_process.stdin.close()
|
| 528 |
+
except Exception as e:
|
| 529 |
+
logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}")
|
| 530 |
|
| 531 |
+
# Wait for ffmpeg process to terminate
|
| 532 |
+
if self.ffmpeg_process.poll() is None: # Check if process is still running
|
| 533 |
+
logger.info("Waiting for FFmpeg process to terminate...")
|
| 534 |
+
try:
|
| 535 |
+
# Run wait in executor to avoid blocking async loop
|
| 536 |
+
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) # 5s timeout
|
| 537 |
+
except Exception as e: # subprocess.TimeoutExpired is not directly caught by asyncio.wait_for with run_in_executor
|
| 538 |
+
logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}")
|
| 539 |
+
self.ffmpeg_process.kill()
|
| 540 |
+
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # Wait for kill
|
| 541 |
+
logger.info("FFmpeg process terminated.")
|
| 542 |
+
|
| 543 |
+
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
|
| 544 |
self.diarization.close()
|
| 545 |
+
logger.info("AudioProcessor cleanup complete.")
|
| 546 |
+
|
| 547 |
|
| 548 |
async def process_audio(self, message):
|
| 549 |
"""Process incoming audio data."""
|
| 550 |
+
# If already stopping or stdin is closed, ignore further audio, especially residual chunks.
|
| 551 |
+
if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed):
|
| 552 |
+
logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).")
|
| 553 |
+
if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
| 554 |
+
logger.info("Received empty message while already in stopping state; ensuring stdin is closed.")
|
| 555 |
+
try:
|
| 556 |
+
self.ffmpeg_process.stdin.close()
|
| 557 |
+
except Exception as e:
|
| 558 |
+
logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}")
|
| 559 |
+
return
|
| 560 |
+
|
| 561 |
+
if not message: # primary signal to start stopping
|
| 562 |
+
logger.info("Empty audio message received, initiating stop sequence.")
|
| 563 |
+
self.is_stopping = True
|
| 564 |
+
if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
| 565 |
+
try:
|
| 566 |
+
self.ffmpeg_process.stdin.close()
|
| 567 |
+
logger.info("FFmpeg stdin closed due to primary stop signal.")
|
| 568 |
+
except Exception as e:
|
| 569 |
+
logger.warning(f"Error closing ffmpeg stdin on stop: {e}")
|
| 570 |
+
return
|
| 571 |
+
|
| 572 |
retry_count = 0
|
| 573 |
max_retries = 3
|
| 574 |
|
|
|
|
| 620 |
else:
|
| 621 |
logger.error("Maximum retries reached for FFmpeg process")
|
| 622 |
await self.restart_ffmpeg()
|
| 623 |
+
return
|