|
import modal |
|
|
|
app = modal.App("deepface-agent") |
|
|
|
|
|
image = ( |
|
modal.Image.debian_slim() |
|
.apt_install("libgl1", "libglib2.0-0") |
|
.pip_install( |
|
"deepface", |
|
"opencv-python", |
|
"numpy", |
|
"Pillow", |
|
"tensorflow==2.19.0", |
|
"tf-keras>=2.19.0", |
|
"librosa", |
|
"scipy", |
|
"speechbrain", |
|
"torchaudio", |
|
) |
|
) |
|
|
|
|
|
|
|
with image.imports(): |
|
from speechbrain.pretrained import SpeakerRecognition |
|
|
|
|
|
verification = SpeakerRecognition.from_hparams( |
|
source="speechbrain/spkrec-ecapa-voxceleb", |
|
savedir="pretrained_models/spkrec-ecapa-voxceleb", |
|
) |
|
|
|
|
|
@app.function(image=image, gpu="any") |
|
def verify_faces_remote(img1_bytes, img2_bytes): |
|
""" |
|
Accepts images bytes and compare them for a match. |
|
""" |
|
from deepface import DeepFace |
|
from PIL import Image |
|
from io import BytesIO |
|
import numpy as np |
|
|
|
img1 = np.array(Image.open(BytesIO(img1_bytes))) |
|
img2 = np.array(Image.open(BytesIO(img2_bytes))) |
|
|
|
result = DeepFace.verify(img1, img2) |
|
return result |
|
|
|
|
|
@app.function(image=image, gpu="any") |
|
def verify_voices_remote(audio1_bytes, audio2_bytes): |
|
""" |
|
Accepts audio bytes and compare them for a match. |
|
""" |
|
|
|
import tempfile |
|
import pathlib |
|
|
|
with ( |
|
tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f1, |
|
tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f2, |
|
): |
|
f1.write(audio1_bytes) |
|
f2.write(audio2_bytes) |
|
audio1_path = f1.name |
|
audio2_path = f2.name |
|
|
|
try: |
|
score, prediction = verification.verify_files(audio1_path, audio2_path) |
|
|
|
return {"match": prediction, "similarity": float(score), "threshold": 0.75} |
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
finally: |
|
pathlib.Path(audio1_path).unlink() |
|
pathlib.Path(audio2_path).unlink() |
|
|
|
|
|
@app.function(image=image, gpu="any", timeout=600) |
|
def verify_faces_in_video_remote(video_bytes: bytes, ref_img_bytes: bytes, interval: int = 30): |
|
import cv2 |
|
import tempfile |
|
import os |
|
import numpy as np |
|
from PIL import Image |
|
from deepface import DeepFace |
|
from io import BytesIO |
|
|
|
results = [] |
|
frame_paths = [] |
|
|
|
with ( |
|
tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as video_temp, |
|
tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as ref_img_temp, |
|
): |
|
video_temp.write(video_bytes) |
|
ref_img_temp.write(ref_img_bytes) |
|
video_path = video_temp.name |
|
ref_img_path = ref_img_temp.name |
|
|
|
try: |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
frame_count = 0 |
|
temp_dir = tempfile.gettempdir() |
|
frame_paths = [] |
|
|
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
if frame_count % interval == 0: |
|
frame_path = os.path.join(temp_dir, f"frame_{frame_count}.jpg") |
|
success = cv2.imwrite(frame_path, frame) |
|
if success: |
|
frame_paths.append((frame_count, frame_path)) |
|
|
|
frame_count += 1 |
|
|
|
cap.release() |
|
|
|
|
|
for frame_id, frame_path in frame_paths: |
|
try: |
|
result = DeepFace.verify( |
|
img1_path=ref_img_path, |
|
img2_path=frame_path, |
|
enforce_detection=False, |
|
) |
|
results.append({ |
|
"frame": frame_id, |
|
"distance": round(result["distance"], 4), |
|
"verified": result["verified"] |
|
}) |
|
except Exception as e: |
|
results.append({ |
|
"frame": frame_id, |
|
"error": str(e) |
|
}) |
|
|
|
return results |
|
|
|
except Exception as e: |
|
return [{"error": str(e)}] |
|
|
|
finally: |
|
os.remove(video_path) |
|
os.remove(ref_img_path) |
|
for _, frame_path in frame_paths: |
|
try: |
|
os.remove(frame_path) |
|
except Exception: |
|
pass |
|
|