Spaces:
Sleeping
Sleeping
import os | |
from fastapi import FastAPI, Request, HTTPException | |
from concurrent.futures import ProcessPoolExecutor | |
from typing import List, Optional | |
from commonmeta import Metadata | |
from pydantic import BaseModel | |
app = FastAPI() | |
executor = ProcessPoolExecutor(max_workers=os.cpu_count()) | |
def parse(string: str, via: str = None, to: str = None, **kwargs): | |
try: | |
metadata = Metadata(string=string, via=via, **kwargs) | |
if not metadata.is_valid: | |
raise ValueError(f"Error in parsing metadata, please verify the format.") | |
except Exception as e: | |
raise ValueError(f"Metadata fetch error: {str(e)}") | |
try: | |
parsed_metadata = metadata.write(to=to) | |
return parsed_metadata | |
except Exception as e: | |
raise ValueError(f"Metadata write error: {str(e)}") | |
def process_single(args): | |
string, via, to, kwargs = args | |
try: | |
return parse(string, via=via, to=to, **kwargs) | |
except ValueError as e: | |
return {"error": str(e), "string": string} | |
async def parse_one(request: Request, string: str, to: str = None, via: str = None): | |
kwargs = dict(request.query_params) | |
kwargs.pop('string', None) | |
kwargs.pop('to', None) | |
kwargs.pop('via', None) | |
try: | |
parsed_metadata = parse(string, via=via, to=to, **kwargs) | |
return {'data': parsed_metadata} | |
except ValueError as e: | |
raise HTTPException(status_code=400, detail=str(e)) | |
class BatchParseRequest(BaseModel): | |
strings: str = "" | |
to: str = None | |
via: str = None | |
async def batch_parse(payload: BatchParseRequest): | |
kwargs = payload.dict() | |
strings = kwargs.pop("strings").split(",") | |
to = kwargs.pop("to", None) | |
via = kwargs.pop("via", None) | |
task_args = [(s, via, to, kwargs) for s in strings] | |
results = list(executor.map(process_single, task_args)) | |
return {"batch": results} | |