Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| # simulation.py | |
| # 仿真与视频相关 | |
| import os | |
| import time | |
| import uuid | |
| import cv2 | |
| import numpy as np | |
| from typing import List | |
| import gradio as gr | |
| from backend_api import get_task_status | |
| def stream_simulation_results(result_folder: str, task_id: str, fps: int = 6): | |
| result_folder = os.path.join(result_folder, "images") | |
| os.makedirs(result_folder, exist_ok=True) | |
| frame_buffer: List[np.ndarray] = [] | |
| frames_per_segment = fps * 2 | |
| processed_files = set() | |
| width, height = 0, 0 | |
| last_status_check = 0 | |
| status_check_interval = 5 | |
| max_time = 240 | |
| while max_time > 0: | |
| max_time -= 1 | |
| current_time = time.time() | |
| if current_time - last_status_check > status_check_interval: | |
| status = get_task_status(task_id) | |
| if status.get("status") == "completed": | |
| process_remaining_images(result_folder, processed_files, frame_buffer) | |
| if frame_buffer: | |
| yield create_video_segment(frame_buffer, fps, width, height) | |
| break | |
| elif status.get("status") == "failed": | |
| raise gr.Error(f"任务执行失败: {status.get('result', '未知错误')}") | |
| elif status.get("status") == "terminated": | |
| break | |
| last_status_check = current_time | |
| current_files = sorted( | |
| [f for f in os.listdir(result_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))], | |
| key=lambda x: os.path.splitext(x)[0] | |
| ) | |
| new_files = [f for f in current_files if f not in processed_files] | |
| has_new_frames = False | |
| for filename in new_files: | |
| try: | |
| img_path = os.path.join(result_folder, filename) | |
| frame = cv2.imread(img_path) | |
| if frame is not None: | |
| if width == 0: | |
| height, width = frame.shape[:2] | |
| frame_buffer.append(frame) | |
| processed_files.add(filename) | |
| has_new_frames = True | |
| except Exception: | |
| pass | |
| if has_new_frames and len(frame_buffer) >= frames_per_segment: | |
| segment_frames = frame_buffer[:frames_per_segment] | |
| frame_buffer = frame_buffer[frames_per_segment:] | |
| yield create_video_segment(segment_frames, fps, width, height) | |
| time.sleep(1) | |
| if max_time <= 0: | |
| raise gr.Error("timeout 240s") | |
| def create_video_segment(frames: List[np.ndarray], fps: int, width: int, height: int) -> str: | |
| os.makedirs("/opt/gradio_demo/tasks/video_chunk", exist_ok=True) | |
| segment_name = f"/opt/gradio_demo/tasks/video_chunk/output_{uuid.uuid4()}.mp4" | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(segment_name, fourcc, fps, (width, height)) | |
| for frame in frames: | |
| out.write(frame) | |
| out.release() | |
| return segment_name | |
| def process_remaining_images(result_folder: str, processed_files: set, frame_buffer: List[np.ndarray]): | |
| current_files = sorted( | |
| [f for f in os.listdir(result_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))], | |
| key=lambda x: os.path.splitext(x)[0] | |
| ) | |
| new_files = [f for f in current_files if f not in processed_files] | |
| for filename in new_files: | |
| try: | |
| img_path = os.path.join(result_folder, filename) | |
| frame = cv2.imread(img_path) | |
| if frame is not None: | |
| frame_buffer.append(frame) | |
| processed_files.add(filename) | |
| except Exception: | |
| pass | |
| def convert_to_h264(video_path): | |
| import shutil | |
| base, ext = os.path.splitext(video_path) | |
| video_path_h264 = f"{base}_h264.mp4" | |
| ffmpeg_bin = "/root/anaconda3/envs/gradio/bin/ffmpeg" | |
| if not os.path.exists(ffmpeg_bin): | |
| ffmpeg_bin = shutil.which("ffmpeg") | |
| if ffmpeg_bin is None: | |
| raise RuntimeError("❌ 找不到 ffmpeg,请确保其已安装并在 PATH 中") | |
| ffmpeg_cmd = [ | |
| ffmpeg_bin, | |
| "-i", video_path, | |
| "-c:v", "libx264", | |
| "-preset", "slow", | |
| "-crf", "23", | |
| "-c:a", "aac", | |
| "-movflags", "+faststart", | |
| video_path_h264 | |
| ] | |
| import subprocess | |
| try: | |
| result = subprocess.run(ffmpeg_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if not os.path.exists(video_path_h264): | |
| raise FileNotFoundError(f"⚠️ H.264 文件未生成: {video_path_h264}") | |
| return video_path_h264 | |
| except Exception as e: | |
| raise | |