# -*- coding: UTF-8 -*- #!/usr/bin/env python import os import json import shutil from datetime import datetime from dotenv import load_dotenv import numpy as np import cv2 from PIL import Image import gradio as gr from huggingface_hub import HfApi, login from roop.globals import ( start, decode_execution_providers, suggest_max_memory, suggest_execution_threads, ) from roop.core import normalize_output_path from roop.processors.frame.core import get_frame_processors_modules from insightface.app import FaceAnalysis # Load environment variables load_dotenv() # Cosine similarity function def cosine_similarity(a, b): return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-6) # Dataset handler class class FaceIntegrDataset: def __init__(self, repo_id="Arrcttacsrks/face_integrData"): self.token = os.getenv("hf_token") if not self.token: raise ValueError("HF_TOKEN environment variable is not set") self.repo_id = repo_id self.api = HfApi() login(self.token) self.temp_dir = "temp_dataset" os.makedirs(self.temp_dir, exist_ok=True) def create_date_folder(self): current_date = datetime.now().strftime("%Y-%m-%d") folder_path = os.path.join(self.temp_dir, current_date) os.makedirs(folder_path, exist_ok=True) return folder_path, current_date def save_metadata(self, source_path, target_path, output_path, timestamp): metadata = { "timestamp": timestamp, "source_image": source_path, "target_image": target_path, "output_image": output_path, "date_created": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } return metadata def upload_to_hf(self, local_folder, date_folder): try: self.api.upload_folder( folder_path=local_folder, repo_id=self.repo_id, repo_type="dataset", path_in_repo=date_folder, ) return True except Exception as e: print(f"Error uploading to Hugging Face: {str(e)}") return False # Image face swap function def swap_face(source_file, target_file, doFaceEnhancer): dataset_handler = FaceIntegrDataset() folder_path, date_folder = dataset_handler.create_date_folder() timestamp = datetime.now().strftime("%S-%M-%H-%d-%m-%Y") try: # Save source and target images source_path = os.path.join(folder_path, f"source_{timestamp}.jpg") target_path = os.path.join(folder_path, f"target_{timestamp}.jpg") output_path = os.path.join(folder_path, f"OutputImage_{timestamp}.jpg") if source_file is None or target_file is None: raise ValueError("Source and target images are required") Image.fromarray(source_file).save(source_path) Image.fromarray(target_file).save(target_path) # Configure Roop globals roop.globals.source_path = source_path roop.globals.target_path = target_path roop.globals.output_path = normalize_output_path( roop.globals.source_path, roop.globals.target_path, output_path ) roop.globals.frame_processors = ( ["face_swapper", "face_enhancer"] if doFaceEnhancer else ["face_swapper"] ) roop.globals.headless = True roop.globals.keep_fps = True roop.globals.keep_audio = True roop.globals.keep_frames = False roop.globals.many_faces = False roop.globals.video_encoder = "libx264" roop.globals.video_quality = 18 roop.globals.max_memory = suggest_max_memory() roop.globals.execution_providers = decode_execution_providers(["cuda"]) roop.globals.execution_threads = suggest_execution_threads() # Pre-check frame processors for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): if not frame_processor.pre_check(): raise RuntimeError("Frame processor pre-check failed") # Start face swap process start() # Save metadata metadata = dataset_handler.save_metadata( f"source_{timestamp}.jpg", f"target_{timestamp}.jpg", f"OutputImage_{timestamp}.jpg", timestamp, ) metadata_path = os.path.join(folder_path, f"metadata_{timestamp}.json") with open(metadata_path, "w") as f: json.dump(metadata, f, indent=4) # Upload to Hugging Face upload_success = dataset_handler.upload_to_hf(folder_path, date_folder) if not upload_success: print("Failed to upload files to Hugging Face dataset") # Return output image if os.path.exists(output_path): output_image = Image.open(output_path) return np.array(output_image) else: raise FileNotFoundError("Output image not found") except Exception as e: print(f"Error in face swap process: {str(e)}") raise gr.Error(f"Face swap failed: {str(e)}") finally: if folder_path and os.path.exists(folder_path): shutil.rmtree(folder_path) # Video face swap helper function def swap_face_frame(frame_bgr, replacement_face_rgb, doFaceEnhancer): frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) temp_dir = "temp_faceswap_frame" os.makedirs(temp_dir, exist_ok=True) timestamp = datetime.now().strftime("%S-%M-%H-%d-%m-%Y") try: source_path = os.path.join(temp_dir, f"source_{timestamp}.jpg") target_path = os.path.join(temp_dir, f"target_{timestamp}.jpg") output_path = os.path.join(temp_dir, f"OutputImage_{timestamp}.jpg") Image.fromarray(frame_rgb).save(source_path) Image.fromarray(replacement_face_rgb).save(target_path) # Configure Roop globals roop.globals.source_path = source_path roop.globals.target_path = target_path roop.globals.output_path = normalize_output_path( source_path, target_path, output_path ) roop.globals.frame_processors = ( ["face_swapper", "face_enhancer"] if doFaceEnhancer else ["face_swapper"] ) roop.globals.headless = True roop.globals.keep_fps = True roop.globals.keep_audio = True roop.globals.keep_frames = False roop.globals.many_faces = False roop.globals.video_encoder = "libx264" roop.globals.video_quality = 18 roop.globals.max_memory = suggest_max_memory() roop.globals.execution_providers = decode_execution_providers(["cuda"]) roop.globals.execution_threads = suggest_execution_threads() start() # Return swapped frame if os.path.exists(output_path): return np.array(Image.open(output_path)) else: return frame_rgb finally: shutil.rmtree(temp_dir) # Video face swap function def swap_face_video(reference_face, replacement_face, video_input, similarity_threshold, doFaceEnhancer): fa = FaceAnalysis() fa.prepare(ctx_id=0) ref_detections = fa.get(reference_face) if not ref_detections: raise gr.Error("No face detected in the reference image!") ref_embedding = ref_detections[0].embedding cap = cv2.VideoCapture(video_input) if not cap.isOpened(): raise gr.Error("Failed to open input video!") fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) output_video_path = "temp_faceswap_video.mp4" fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) frame_index = 0 while True: ret, frame = cap.read() if not ret: break frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) detections = fa.get(frame_rgb) swap_this_frame = any( cosine_similarity(det.embedding, ref_embedding) >= similarity_threshold for det in detections ) if swap_this_frame: swapped_frame_rgb = swap_face_frame(frame, replacement_face, doFaceEnhancer) swapped_frame = cv2.cvtColor(swapped_frame_rgb, cv2.COLOR_RGB2BGR) else: swapped_frame = frame out.write(swapped_frame) frame_index += 1 print(f"Processed frame {frame_index}") cap.release() out.release() return output_video_path # Gradio interface def create_interface(): custom_css = """ .container { max-width: 1200px; margin: auto; padding: 20px; } .output-image { min-height: 400px; border: 1px solid #ccc; border-radius: 8px; padding: 10px; } """ title = "Face - Integrator" description = "Upload source and target images to perform face swap." article = """
This tool performs face swapping with optional enhancement.