import os import uuid import yaml import json import shutil import torch from pathlib import Path from PIL import Image from fastapi import FastAPI from fastapi.responses import JSONResponse from huggingface_hub import hf_hub_download, whoami import os os.environ["HF_HOME"] = "/tmp/hf_cache" os.makedirs("/tmp/hf_cache", exist_ok=True) from fastapi import FastAPI, Query from huggingface_hub import list_repo_files, hf_hub_download, upload_file import io import requests from fastapi import BackgroundTasks from fastapi import FastAPI, UploadFile, File from fastapi.middleware.cors import CORSMiddleware import os import os import zipfile import tempfile # ✅ Add this! app = FastAPI() # CORS setup to allow requests from your frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # Replace "*" with your frontend domain in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def health_check(): return {"status": "✅ FastAPI running on Hugging Face Spaces!"} @app.get("/healthz") def healthz(): return {"ok": True} @app.get("/docs", include_in_schema=False) def custom_docs(): return JSONResponse(get_openapi(title="LoRA Autorun API", version="1.0.0", routes=app.routes)) REPO_ID = "rahul7star/ohamlab" FOLDER = "demo" BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" #show all images in a DIR at UI FE @app.get("/images") def list_images(): try: all_files = list_repo_files(REPO_ID) folder_prefix = FOLDER.rstrip("/") + "/" files_in_folder = [ f for f in all_files if f.startswith(folder_prefix) and "/" not in f[len(folder_prefix):] # no subfolder files and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) ] urls = [BASE_URL + f for f in files_in_folder] return {"images": urls} except Exception as e: return {"error": str(e)} from datetime import datetime import tempfile import uuid # upload zip from UI @app.post("/upload-zip") async def upload_zip(file: UploadFile = File(...)): if not file.filename.endswith(".zip"): return {"error": "Please upload a .zip file"} # Save the ZIP to /tmp temp_zip_path = f"/tmp/{file.filename}" with open(temp_zip_path, "wb") as f: f.write(await file.read()) # Create a unique subfolder name inside 'demo/' timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:6] folder_name = f"upload_{timestamp}_{unique_id}" hf_folder_prefix = f"demo/{folder_name}" try: with tempfile.TemporaryDirectory() as extract_dir: # Extract zip with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) uploaded_files = [] # Upload all extracted files for root_dir, _, files in os.walk(extract_dir): for name in files: file_path = os.path.join(root_dir, name) relative_path = os.path.relpath(file_path, extract_dir) repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") upload_file( path_or_fileobj=file_path, path_in_repo=repo_path, repo_id="rahul7star/ohamlab", repo_type="model", commit_message=f"Upload {relative_path} to {folder_name}", token=True, ) uploaded_files.append(repo_path) return { "message": f"✅ Uploaded {len(uploaded_files)} files", "folder": folder_name, "files": uploaded_files, } except Exception as e: return {"error": f"❌ Failed to process zip: {str(e)}"} # upload a single file from UI from typing import List from fastapi import UploadFile, File, APIRouter import os from fastapi import UploadFile, File, APIRouter from typing import List from datetime import datetime import uuid, os @app.post("/upload") async def upload_images( background_tasks: BackgroundTasks, files: List[UploadFile] = File(...) ): # Step 1: Generate dynamic folder name timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:6] folder_name = f"upload_{timestamp}_{unique_id}" hf_folder_prefix = f"demo/{folder_name}" responses = [] # Step 2: Save and upload each image for file in files: filename = file.filename contents = await file.read() temp_path = f"/tmp/{filename}" with open(temp_path, "wb") as f: f.write(contents) try: upload_file( path_or_fileobj=temp_path, path_in_repo=f"{hf_folder_prefix}/{filename}", repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload {filename} to {hf_folder_prefix}", token=True, ) responses.append({ "filename": filename, "status": "✅ uploaded", "path": f"{hf_folder_prefix}/{filename}" }) except Exception as e: responses.append({ "filename": filename, "status": f"❌ failed: {str(e)}" }) os.remove(temp_path) # Step 3: Add filter job to background def run_filter(): try: result = filter_and_rename_images(folder=hf_folder_prefix) print(f"🧼 Filter result: {result}") except Exception as e: print(f"❌ Filter failed: {str(e)}") background_tasks.add_task(run_filter) return { "message": f"{len(files)} file(s) uploaded", "upload_folder": hf_folder_prefix, "results": responses, "note": "Filtering started in background" } #Tranining Data set start fitering data for traninig T_REPO_ID = "rahul7star/ohamlab" DESCRIPTION_TEXT = ( "Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " "He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." ) def is_image_file(filename: str) -> bool: return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) @app.post("/filter-images") def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")): try: all_files = list_repo_files(T_REPO_ID) folder_prefix = folder.rstrip("/") + "/" filter_folder = f"filter-{folder.rstrip('/')}" filter_prefix = filter_folder + "/" # Filter images only directly in the folder (no subfolders) image_files = [ f for f in all_files if f.startswith(folder_prefix) and "/" not in f[len(folder_prefix):] # no deeper path and is_image_file(f) ] if not image_files: return {"error": f"No images found in folder '{folder}'"} uploaded_files = [] for idx, orig_path in enumerate(image_files, start=1): # Download image content bytes (uses local cache) local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) with open(local_path, "rb") as f: file_bytes = f.read() # Rename images as image1.jpeg, image2.jpeg, ... new_image_name = f"image{idx}.jpeg" # Upload renamed image from memory upload_file( path_or_fileobj=io.BytesIO(file_bytes), path_in_repo=filter_prefix + new_image_name, repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", token=True, ) uploaded_files.append(filter_prefix + new_image_name) # Create and upload text file for each image txt_filename = f"image{idx}.txt" upload_file( path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), path_in_repo=filter_prefix + txt_filename, repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload text file {txt_filename} to {filter_folder}", token=True, ) uploaded_files.append(filter_prefix + txt_filename) return { "message": f"Processed and uploaded {len(image_files)} images and text files.", "files": uploaded_files, } except Exception as e: return {"error": str(e)} # ========== CONFIGURATION ========== REPO_ID = "rahul7star/ohamlab" FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" CONCEPT_SENTENCE = "ohamlab style" LORA_NAME = "ohami_filter_autorun" # ========== FASTAPI APP ========== app = FastAPI() # ========== HELPERS ========== def create_dataset(images, *captions): destination_folder = f"datasets_{uuid.uuid4()}" os.makedirs(destination_folder, exist_ok=True) jsonl_file_path = os.path.join(destination_folder, "metadata.jsonl") with open(jsonl_file_path, "a") as jsonl_file: for index, image in enumerate(images): new_image_path = shutil.copy(str(image), destination_folder) caption = captions[index] file_name = os.path.basename(new_image_path) data = {"file_name": file_name, "prompt": caption} jsonl_file.write(json.dumps(data) + "\n") return destination_folder def recursive_update(d, u): for k, v in u.items(): if isinstance(v, dict) and v: d[k] = recursive_update(d.get(k, {}), v) else: d[k] = v return d def start_training( lora_name, concept_sentence, steps, lr, rank, model_to_train, low_vram, dataset_folder, sample_1, sample_2, sample_3, use_more_advanced_options, more_advanced_options, ): try: user = whoami() username = user.get("name", "anonymous") push_to_hub = True except: username = "anonymous" push_to_hub = False slugged_lora_name = lora_name.replace(" ", "_").lower() # Load base config config = { "config": { "name": slugged_lora_name, "process": [ { "model": { "low_vram": low_vram, "is_flux": True, "quantize": True, "name_or_path": "black-forest-labs/FLUX.1-dev" }, "network": { "linear": rank, "linear_alpha": rank, "type": "lora" }, "train": { "steps": steps, "lr": lr, "skip_first_sample": True, "batch_size": 1, "dtype": "bf16", "gradient_accumulation_steps": 1, "gradient_checkpointing": True, "noise_scheduler": "flowmatch", "optimizer": "adamw8bit", "ema_config": { "use_ema": True, "ema_decay": 0.99 } }, "datasets": [ {"folder_path": dataset_folder} ], "save": { "dtype": "float16", "save_every": 10000, "push_to_hub": push_to_hub, "hf_repo_id": f"{username}/{slugged_lora_name}", "hf_private": True, "max_step_saves_to_keep": 4 }, "sample": { "guidance_scale": 3.5, "sample_every": steps, "sample_steps": 28, "width": 1024, "height": 1024, "walk_seed": True, "seed": 42, "sampler": "flowmatch", "prompts": [p for p in [sample_1, sample_2, sample_3] if p] }, "trigger_word": concept_sentence } ] } } # Apply advanced YAML overrides if any if use_more_advanced_options and more_advanced_options: advanced_config = yaml.safe_load(more_advanced_options) config["config"]["process"][0] = recursive_update(config["config"]["process"][0], advanced_config) # Save YAML config os.makedirs("tmp_configs", exist_ok=True) config_path = f"tmp_configs/{uuid.uuid4()}_{slugged_lora_name}.yaml" with open(config_path, "w") as f: yaml.dump(config, f) # Simulate training print(f"[INFO] Starting training with config: {config_path}") print(json.dumps(config, indent=2)) return f"Training started successfully with config: {config_path}" # ========== MAIN ENDPOINT ========== @app.post("/train-from-hf") def auto_run_lora_from_repo(): try: local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") os.makedirs(local_dir, exist_ok=True) hf_hub_download( repo_id=REPO_ID, repo_type="dataset", subfolder=FOLDER_IN_REPO, local_dir=local_dir, local_dir_use_symlinks=False, force_download=False, etag_timeout=10, allow_patterns=["*.jpg", "*.png", "*.jpeg"], ) image_dir = local_dir / FOLDER_IN_REPO image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) if not image_paths: return JSONResponse(status_code=400, content={"error": "No images found in the HF repo folder."}) captions = [ f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths ] dataset_path = create_dataset(image_paths, *captions) result = start_training( lora_name=LORA_NAME, concept_sentence=CONCEPT_SENTENCE, steps=1000, lr=4e-4, rank=16, model_to_train="dev", low_vram=True, dataset_folder=dataset_path, sample_1=f"A stylized portrait using {CONCEPT_SENTENCE}", sample_2=f"A cat in the {CONCEPT_SENTENCE}", sample_3=f"A selfie processed in {CONCEPT_SENTENCE}", use_more_advanced_options=True, more_advanced_options=""" training: seed: 42 precision: bf16 batch_size: 2 augmentation: flip: true color_jitter: true """ ) return {"message": result} except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)})