Spaces:
Sleeping
Sleeping
File size: 3,185 Bytes
4c7b93d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
from fastapi import APIRouter, status, Response, Path
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
# from google.cloud import storage
from config import settings
import datetime
import firebase_admin
from firebase_admin import credentials
from firebase_admin import storage
from google.cloud import storage
from pathlib import Path
import numpy as np
import csv
import json
import os
# cred = credentials.Certificate(settings.POLICY_FILE_PATH)
# app = firebase_admin.initialize_app(cred, {'storageBucket': 'cnc-designs.appspot.com'}, name='storage')
# bucket = storage.bucket(app=app)
CWD = Path(__file__).parent
# client = storage.Client()
client = storage.Client.from_service_account_json(
os.path.join(CWD.parent.parent, settings.POLICY_FILE_PATH)
)
bucket = client.bucket("thangtd1")
router = APIRouter()
@router.get("/{video}/metadata")
async def get_length(video: str) -> JSONResponse:
print(video)
try:
path_file = "metadata/" + video + ".json"
blob = bucket.blob(path_file)
metadata = {}
with blob.open("r") as f:
metadata = json.load(f)
except Exception:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"message": "err"},
)
return JSONResponse(status_code=status.HTTP_200_OK, content=metadata)
@router.get("/{video}/keyframes/list")
async def get_list_keyframes(video: str) -> JSONResponse:
path_file = "Keyframes/" + video
list_keyframes = []
meta_data = []
path_meta_file = "map-keyframes/" + video + ".csv"
blob = bucket.blob(path_meta_file)
with blob.open("r") as f:
reader = csv.reader(f)
meta_data = list(reader)
# print(meta_data[0])
meta_data = np.array(meta_data[1:])
for blob in client.list_blobs("thangtd1", prefix=path_file):
list_keyframes.append([str(blob.name)])
list_keyframes = np.array(list_keyframes)
list_keyframes = np.append(list_keyframes, meta_data, axis=1)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"list_keyframes": list_keyframes.tolist()},
)
@router.get("/stream_video/{bucket_name}/{video_blob_name}")
async def stream_video(
bucket_name: str,
video_blob_name: str,
response: Response = None,
) -> JSONResponse:
path_file = "Video/" + video_blob_name
# blob = bucket.blob(path_file)
# url = blob.generate_signed_url(
# This URL is valid for 15 minutes
# expiration=datetime.timedelta(minutes=15),
# Allow GET requests using this URL.
# method='GET'
# )
# print(url)
# return JSONResponse(status_code=status.HTTP_200_OK, content = {"url" : url})
# if not blob.exists():
# return StreamingResponse(
# content_generator([b"Video not found"]), media_type="text/plain"
# )
# # Stream the video in chunks
# def content_generator():
# for chunk in blob.download_as_bytes(start=0, end=blob.size, raw_download=True):
# yield chunk
# return StreamingResponse(content_generator(), media_type="video/mp4")
|