import os os.environ["HF_HOME"] = "/tmp/hf_cache" import sys sys.path.append(os.path.dirname(os.path.abspath(__file__))) os.makedirs("/tmp/hf_cache", exist_ok=True) from huggingface_hub import whoami os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" import spaces from fastapi import FastAPI, Query from huggingface_hub import list_repo_files, hf_hub_download, upload_file import io import requests from fastapi import BackgroundTasks from fastapi import FastAPI, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from pathlib import Path from pathlib import Path import uuid import shutil import json import os import os import os import zipfile import tempfile # ✅ Add this! import yaml sys.path.insert(0, os.getcwd()) import gradio as gr from PIL import Image import torch import uuid import os import shutil import json import yaml from slugify import slugify # sys.path.insert(0, "ai-toolkit") # from toolkit.job import get_job app = FastAPI() # CORS setup to allow requests from your frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # Replace "*" with your frontend domain in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def health_check(): return {"status": "✅ FastAPI running on Hugging Face Spaces!"} REPO_ID = "rahul7star/ohamlab" FOLDER = "demo" BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" #show all images in a DIR at UI FE @app.get("/images") def list_images(): try: all_files = list_repo_files(REPO_ID) folder_prefix = FOLDER.rstrip("/") + "/" files_in_folder = [ f for f in all_files if f.startswith(folder_prefix) and "/" not in f[len(folder_prefix):] # no subfolder files and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) ] urls = [BASE_URL + f for f in files_in_folder] return {"images": urls} except Exception as e: return {"error": str(e)} from datetime import datetime import tempfile import uuid # upload zip from UI @app.post("/upload-zip") async def upload_zip(file: UploadFile = File(...)): if not file.filename.endswith(".zip"): return {"error": "Please upload a .zip file"} # Save the ZIP to /tmp temp_zip_path = f"/tmp/{file.filename}" with open(temp_zip_path, "wb") as f: f.write(await file.read()) # Create a unique subfolder name inside 'demo/' timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:6] folder_name = f"upload_{timestamp}_{unique_id}" hf_folder_prefix = f"demo/{folder_name}" try: with tempfile.TemporaryDirectory() as extract_dir: # Extract zip with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) uploaded_files = [] # Upload all extracted files for root_dir, _, files in os.walk(extract_dir): for name in files: file_path = os.path.join(root_dir, name) relative_path = os.path.relpath(file_path, extract_dir) repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") upload_file( path_or_fileobj=file_path, path_in_repo=repo_path, repo_id="rahul7star/ohamlab", repo_type="model", commit_message=f"Upload {relative_path} to {folder_name}", token=True, ) uploaded_files.append(repo_path) return { "message": f"✅ Uploaded {len(uploaded_files)} files", "folder": folder_name, "files": uploaded_files, } except Exception as e: return {"error": f"❌ Failed to process zip: {str(e)}"} # upload a single file from UI from typing import List from fastapi import UploadFile, File, APIRouter import os from fastapi import UploadFile, File, APIRouter from typing import List from datetime import datetime import uuid, os @app.post("/upload") async def upload_images( background_tasks: BackgroundTasks, files: List[UploadFile] = File(...) ): # Step 1: Generate dynamic folder name timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:6] folder_name = f"upload_{timestamp}_{unique_id}" hf_folder_prefix = f"demo/{folder_name}" responses = [] # Step 2: Save and upload each image for file in files: filename = file.filename contents = await file.read() temp_path = f"/tmp/{filename}" with open(temp_path, "wb") as f: f.write(contents) try: upload_file( path_or_fileobj=temp_path, path_in_repo=f"{hf_folder_prefix}/{filename}", repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload {filename} to {hf_folder_prefix}", token=True, ) responses.append({ "filename": filename, "status": "✅ uploaded", "path": f"{hf_folder_prefix}/{filename}" }) except Exception as e: responses.append({ "filename": filename, "status": f"❌ failed: {str(e)}" }) os.remove(temp_path) # Step 3: Add filter job to background def run_filter(): try: result = filter_and_rename_images(folder=hf_folder_prefix) print(f"🧼 Filter result: {result}") except Exception as e: print(f"❌ Filter failed: {str(e)}") background_tasks.add_task(run_filter) return { "message": f"{len(files)} file(s) uploaded", "upload_folder": hf_folder_prefix, "results": responses, "note": "Filtering started in background" } #Tranining Data set start fitering data for traninig T_REPO_ID = "rahul7star/ohamlab" DESCRIPTION_TEXT = ( "Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " "He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." ) def is_image_file(filename: str) -> bool: return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) @app.post("/filter-images") def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")): try: all_files = list_repo_files(T_REPO_ID) folder_prefix = folder.rstrip("/") + "/" filter_folder = f"filter-{folder.rstrip('/')}" filter_prefix = filter_folder + "/" # Filter images only directly in the folder (no subfolders) image_files = [ f for f in all_files if f.startswith(folder_prefix) and "/" not in f[len(folder_prefix):] # no deeper path and is_image_file(f) ] if not image_files: return {"error": f"No images found in folder '{folder}'"} uploaded_files = [] for idx, orig_path in enumerate(image_files, start=1): # Download image content bytes (uses local cache) local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) with open(local_path, "rb") as f: file_bytes = f.read() # Rename images as image1.jpeg, image2.jpeg, ... new_image_name = f"image{idx}.jpeg" # Upload renamed image from memory upload_file( path_or_fileobj=io.BytesIO(file_bytes), path_in_repo=filter_prefix + new_image_name, repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", token=True, ) uploaded_files.append(filter_prefix + new_image_name) # Create and upload text file for each image txt_filename = f"image{idx}.txt" upload_file( path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), path_in_repo=filter_prefix + txt_filename, repo_id=T_REPO_ID, repo_type="model", commit_message=f"Upload text file {txt_filename} to {filter_folder}", token=True, ) uploaded_files.append(filter_prefix + txt_filename) return { "message": f"Processed and uploaded {len(image_files)} images and text files.", "files": uploaded_files, } except Exception as e: return {"error": str(e)} # Test call another space and send the payload @app.post("/webhook-trigger") def call_other_space(): try: payload = {"input": "Start training from external trigger"} res = requests.post( "https://rahul7star-ohamlab-ai-toolkit.hf.space/trigger", json=payload, timeout=30, ) # ✅ check if response has content and is JSON try: data = res.json() except ValueError: return { "error": f"Invalid JSON response. Status: {res.status_code}", "text": res.text } return data except Exception as e: return {"error": str(e)} # ========== TRAIN CONFIGURATION ========== ##checking model sample import os import uuid from pathlib import Path from huggingface_hub import hf_hub_download from fastapi.responses import JSONResponse from huggingface_hub import snapshot_download # Constants REPO_ID = "rahul7star/ohamlab" FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" CONCEPT_SENTENCE = "ohamlab style" LORA_NAME = "ohami_filter_autorun" @app.get("/train-sample") def fetch_images_and_generate_captions(): # Create a unique local directory local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") os.makedirs(local_dir, exist_ok=True) # Download all files from the dataset repo snapshot_path = snapshot_download( repo_id=REPO_ID, repo_type="model", local_dir=local_dir, local_dir_use_symlinks=False, allow_patterns=[f"{FOLDER_IN_REPO}/*"], # only files inside the subfolder ) # Resolve image path relative to downloaded snapshot image_dir = Path(snapshot_path) / FOLDER_IN_REPO image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) if not image_paths: return JSONResponse(status_code=400, content={"error": "No images found in the HF repo folder."}) captions = [ f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths ] return { "local_dir": str(image_dir), "images": [str(p) for p in image_paths], "captions": captions } REPO_ID = "rahul7star/ohamlab" FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" CONCEPT_SENTENCE = "ohamlab style" LORA_NAME = "ohami_filter_autorun" # ========== FASTAPI APP ========== # ========== HELPERS ========== def create_dataset(images, *captions): if len(images) != len(captions): raise ValueError("Number of images and captions must be the same.") destination_folder = Path(f"/tmp/datasets_{uuid.uuid4()}") destination_folder.mkdir(parents=True, exist_ok=True) jsonl_file_path = destination_folder / "metadata.jsonl" with jsonl_file_path.open("a", encoding="utf-8") as jsonl_file: for image_path, caption in zip(images, captions): new_image_path = shutil.copy(str(image_path), destination_folder) file_name = Path(new_image_path).name entry = {"file_name": file_name, "prompt": caption} jsonl_file.write(json.dumps(entry, ensure_ascii=False) + "\n") return str(destination_folder) def recursive_update(d, u): for k, v in u.items(): if isinstance(v, dict) and v: d[k] = recursive_update(d.get(k, {}), v) else: d[k] = v return d def start_training( lora_name, concept_sentence, steps, lr, rank, model_to_train, low_vram, dataset_folder, sample_1, sample_2, sample_3, use_more_advanced_options, more_advanced_options, ): try: user = whoami() username = user.get("name", "anonymous") push_to_hub = True except: username = "anonymous" push_to_hub = False slugged_lora_name = lora_name.replace(" ", "_").lower() print(username) # Load base config config = { "config": { "name": slugged_lora_name, "process": [ { "model": { "low_vram": low_vram, "is_flux": True, "quantize": True, "name_or_path": "black-forest-labs/FLUX.1-dev" }, "network": { "linear": rank, "linear_alpha": rank, "type": "lora" }, "train": { "steps": steps, "lr": lr, "skip_first_sample": True, "batch_size": 1, "dtype": "bf16", "gradient_accumulation_steps": 1, "gradient_checkpointing": True, "noise_scheduler": "flowmatch", "optimizer": "adamw8bit", "ema_config": { "use_ema": True, "ema_decay": 0.99 } }, "datasets": [ {"folder_path": dataset_folder} ], "save": { "dtype": "float16", "save_every": 10000, "push_to_hub": push_to_hub, "hf_repo_id": f"{username}/{slugged_lora_name}", "hf_private": True, "max_step_saves_to_keep": 4 }, "sample": { "guidance_scale": 3.5, "sample_every": steps, "sample_steps": 28, "width": 1024, "height": 1024, "walk_seed": True, "seed": 42, "sampler": "flowmatch", "prompts": [p for p in [sample_1, sample_2, sample_3] if p] }, "trigger_word": concept_sentence } ] } } # Apply advanced YAML overrides if any # if use_more_advanced_options and more_advanced_options: # advanced_config = yaml.safe_load(more_advanced_options) # config["config"]["process"][0] = recursive_update(config["config"]["process"][0], advanced_config) # Save YAML config os.makedirs("/tmp/tmp_configs", exist_ok=True) config_path = f"/tmp/tmp_configs/{uuid.uuid4()}_{slugged_lora_name}.yaml" with open(config_path, "w") as f: yaml.dump(config, f) print(config_path) # Simulate training # job = get_job(config_path) # job.run() # job.cleanup() print(f"[INFO] Starting training with config: {config_path}") print(json.dumps(config, indent=2)) return f"Training started successfully with config: {config_path}" # ========== MAIN ENDPOINT ========== @app.post("/train-from-hf") def auto_run_lora_from_repo(): try: # ✅ Static or dynamic config REPO_ID = "rahul7star/ohamlab" FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" CONCEPT_SENTENCE = "ohamlab style" LORA_NAME = "ohami_filter_autorun" # ✅ Setup HF cache os.environ["HF_HOME"] = "/tmp/hf_cache" os.makedirs("/tmp/hf_cache", exist_ok=True) # ✅ Download dataset from HF local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") os.makedirs(local_dir, exist_ok=True) snapshot_path = snapshot_download( repo_id=REPO_ID, repo_type="model", local_dir=local_dir, local_dir_use_symlinks=False, allow_patterns=[f"{FOLDER_IN_REPO}/*"], # only files inside the subfolder ) image_dir = local_dir / FOLDER_IN_REPO image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) if not image_paths: raise HTTPException(status_code=400, detail="No images found in the Hugging Face folder.") # ✅ Auto-generate captions captions = [ f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths ] # ✅ Create dataset folder with metadata.jsonl dataset_folder = os.path.join("/tmp", f"datasets_{uuid.uuid4()}") os.makedirs(dataset_folder, exist_ok=True) print('DATA SET iS CREATED =================================================') jsonl_file_path = os.path.join(dataset_folder, "metadata.jsonl") with open(jsonl_file_path, "a") as jsonl_file: for index, image in enumerate(image_paths): new_image_path = shutil.copy(str(image), dataset_folder) file_name = os.path.basename(new_image_path) data = {"file_name": file_name, "prompt": captions[index]} jsonl_file.write(json.dumps(data) + "\n") # ✅ Optional advanced config slugged_lora_name = LORA_NAME.replace(" ", "_") os.makedirs("/tmp/tmp_configs", exist_ok=True) config_path = f"/tmp/tmp_configs/{uuid.uuid4()}_{slugged_lora_name}.yaml" config = { "sample_1": "a stylish anime character with ohamlab style", "sample_2": "a cartoon car in ohamlab style", "sample_3": "portrait in ohamlab lighting" } with open(config_path, "w") as f: yaml.dump(config, f) # ✅ Final call to train print(f" slugged_lora{ slugged_lora_name}") print('Now Start Trainng Set called all data si rADYU =================================================') result = start_training( lora_name=LORA_NAME, concept_sentence=CONCEPT_SENTENCE, steps=45, lr=1e-4, rank=32, model_to_train="flux", low_vram=True, dataset_folder=dataset_folder, sample_1=config["sample_1"], sample_2=config["sample_2"], sample_3=config["sample_3"], use_more_advanced_options=True, more_advanced_options=config_path ) return JSONResponse(content={"status": "success", "message": result}) except Exception as e: raise HTTPException(status_code=500, detail=str(e))