|  | from fastapi import APIRouter, HTTPException, status, BackgroundTasks, UploadFile, Query | 
					
						
						|  | from .Schema import EditorRequest, TaskInfo | 
					
						
						|  | from App.Worker import celery_task, concatenate_videos | 
					
						
						|  | from celery.result import AsyncResult | 
					
						
						|  | import aiofiles, os, uuid, aiohttp | 
					
						
						|  | from App import SERVER_STATE, Task | 
					
						
						|  |  | 
					
						
						|  | videditor_router = APIRouter(tags=["vidEditor"]) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @videditor_router.post("/create-video") | 
					
						
						|  | async def create_video(videoRequest: EditorRequest, background_task: BackgroundTasks): | 
					
						
						|  | background_task.add_task(celery_task, videoRequest) | 
					
						
						|  | return {"task_id": "started"} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @videditor_router.post("/create-chunks") | 
					
						
						|  | async def create_chunks(videoRequest: EditorRequest, background_task: BackgroundTasks): | 
					
						
						|  | video_duration = videoRequest.constants.duration | 
					
						
						|  | task_id = uuid.uuid4() | 
					
						
						|  | new_task = Task(TASK_ID=task_id) | 
					
						
						|  |  | 
					
						
						|  | active_nodes = [ | 
					
						
						|  | node | 
					
						
						|  | for node in SERVER_STATE.NODES | 
					
						
						|  | if await new_task._check_node_online(node.SPACE_HOST) | 
					
						
						|  | ] | 
					
						
						|  | number_of_nodes = len(active_nodes) | 
					
						
						|  | ranges = [ | 
					
						
						|  | [i, i + number_of_nodes] for i in range(0, video_duration, number_of_nodes) | 
					
						
						|  | ] | 
					
						
						|  | for i, node in enumerate(active_nodes): | 
					
						
						|  | await new_task.add_node(node, i) | 
					
						
						|  |  | 
					
						
						|  | SERVER_STATE.TASKS[task_id] = new_task | 
					
						
						|  |  | 
					
						
						|  | async with aiohttp.ClientSession() as session: | 
					
						
						|  | for i, node in enumerate(active_nodes): | 
					
						
						|  | videoRequest.constants.frames = ranges[i] | 
					
						
						|  | if node.SPACE_HOST == SERVER_STATE.SPACE_HOST: | 
					
						
						|  | background_task.add_task(celery_task, videoRequest) | 
					
						
						|  | async with session.post( | 
					
						
						|  | "node.SPACE_HOST/create-video", json=videoRequest | 
					
						
						|  | ) as response: | 
					
						
						|  | if response.status != 200: | 
					
						
						|  | raise HTTPException( | 
					
						
						|  | status_code=response.status, | 
					
						
						|  | detail="Failed to post request to node", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | return {"task_id": "started"} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @videditor_router.post("/uploadfile/") | 
					
						
						|  | async def create_file( | 
					
						
						|  | background_tasks: BackgroundTasks, | 
					
						
						|  | file: UploadFile, | 
					
						
						|  | node: str, | 
					
						
						|  | chunk: int, | 
					
						
						|  | task: str, | 
					
						
						|  | ): | 
					
						
						|  |  | 
					
						
						|  | chunk_directory = f"/tmp/Video/{task}" | 
					
						
						|  | file_name = f"{chunk_directory}/{chunk}.mp4" | 
					
						
						|  |  | 
					
						
						|  | os.makedirs(chunk_directory, exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | async with aiofiles.open(file_name, "wb") as f: | 
					
						
						|  | while contents := await file.read(1024 * 1): | 
					
						
						|  | await f.write(contents) | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | return { | 
					
						
						|  | "message": f"There was an error uploading the file, error message {str(e)}  " | 
					
						
						|  | } | 
					
						
						|  | finally: | 
					
						
						|  | await file.close() | 
					
						
						|  | running_task = SERVER_STATE.TASKS[task] | 
					
						
						|  | running_task.mark_node_completed(node) | 
					
						
						|  | if running_task.is_completed(): | 
					
						
						|  | background_tasks.add_task(concatenate_videos, chunk_directory) | 
					
						
						|  |  | 
					
						
						|  | return {"message": "File uploaded successfully"} | 
					
						
						|  |  |