Spaces:
Running
Running
import gradio as gr | |
import requests | |
import yt_dlp | |
import cv2 | |
from google_img_source_search import ReverseImageSearcher | |
from PIL import Image | |
import os | |
import uuid | |
from pathlib import Path | |
import shutil | |
import logging | |
import sys | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Custom CSS for better presentation | |
CUSTOM_CSS = """ | |
.result-item { | |
margin: 20px 0; | |
padding: 15px; | |
border: 1px solid #ddd; | |
border-radius: 5px; | |
} | |
.result-item img { | |
max-width: 100%; | |
height: auto; | |
margin: 10px 0; | |
} | |
.error-message { | |
color: red; | |
padding: 10px; | |
border: 1px solid red; | |
border-radius: 5px; | |
margin: 10px 0; | |
} | |
""" | |
def create_temp_dir(): | |
"""Create a temporary directory with UUID.""" | |
uid = str(uuid.uuid4()) | |
temp_dir = Path(f"temp_{uid}") | |
temp_dir.mkdir(exist_ok=True) | |
return temp_dir | |
def cleanup_temp_dir(temp_dir): | |
"""Safely clean up temporary directory and its contents.""" | |
try: | |
if temp_dir.exists(): | |
shutil.rmtree(str(temp_dir)) | |
except Exception as e: | |
logger.error(f"Error cleaning up temporary directory: {e}") | |
def dl(inp): | |
"""Download video from URL using yt-dlp.""" | |
if not inp or not inp.strip(): | |
return None, gr.HTML("<p class='error-message'>Please provide a valid URL</p>"), "", "" | |
temp_dir = create_temp_dir() | |
try: | |
# Sanitize input filename | |
inp_out = inp.replace("https://", "").replace("/", "_").replace(".", "_").replace("=", "_").replace("?", "_") | |
output_path = str(temp_dir / f"{inp_out}.mp4") | |
# Configure yt-dlp options | |
ydl_opts = { | |
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4', | |
'outtmpl': output_path, | |
'quiet': True, | |
'no_warnings': True, | |
'extract_flat': False, | |
'merge_output_format': 'mp4' | |
} | |
# Handle Twitter URLs differently | |
if "twitter" in inp.lower(): | |
ydl_opts['extractor_args'] = {'twitter': {'api': ['syndication']}} | |
# Download the video | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
logger.info(f"Downloading video from: {inp}") | |
ydl.download([inp]) | |
if os.path.exists(output_path): | |
logger.info(f"Successfully downloaded to: {output_path}") | |
return output_path, gr.HTML(""), "", "" | |
else: | |
raise Exception("Download completed but file not found") | |
except Exception as e: | |
logger.error(f"Error downloading video: {e}") | |
cleanup_temp_dir(temp_dir) | |
return None, gr.HTML(f"<p class='error-message'>Error: {str(e)}</p>"), "", "" | |
def process_vid(file, cur_frame, every_n): | |
"""Process video file and search for frames.""" | |
if not file: | |
return gr.HTML("<p class='error-message'>No video file provided</p>"), "", "" | |
temp_dir = create_temp_dir() | |
capture = None | |
try: | |
capture = cv2.VideoCapture(str(file)) | |
if not capture.isOpened(): | |
raise Exception("Failed to open video file") | |
frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) | |
rev_img_searcher = ReverseImageSearcher() | |
html_out = "" | |
count = int(every_n) | |
start_frame = int(cur_frame) if cur_frame and cur_frame.strip() else 0 | |
for i in range(start_frame, frame_count-1): | |
if count == int(every_n): | |
count = 1 | |
logger.info(f"Processing frame {i}") | |
capture.set(cv2.CAP_PROP_POS_FRAMES, i) | |
ret, frame = capture.read() | |
if not ret: | |
continue | |
temp_image = temp_dir / f"frame_{i}.png" | |
cv2.imwrite(str(temp_image), frame) | |
# Generate the URL for the temporary image | |
image_path = os.path.abspath(str(temp_image)) | |
image_url = f'https://nymbo-reverse-image.hf.space/file={image_path}' | |
try: | |
results = rev_img_searcher.search(image_url) | |
if results: | |
out_cnt = len(results) | |
html_out = build_results_html(results) | |
return gr.HTML(f'<h1>Total Found: {out_cnt}</h1><br>{html_out}'), f"Found frame: {i}", i+int(every_n) | |
except Exception as search_error: | |
logger.error(f"Search error on frame {i}: {search_error}") | |
count += 1 | |
except Exception as e: | |
logger.error(f"Error processing video: {e}") | |
return gr.HTML(f'<p class="error-message">Error: {str(e)}</p>'), "", "" | |
finally: | |
if capture is not None: | |
capture.release() | |
cleanup_temp_dir(temp_dir) | |
return gr.HTML('No frame matches found.'), "", "" | |
def process_im(file, url): | |
"""Process image file or URL.""" | |
if not url.startswith("https://nymbo"): | |
return url | |
temp_dir = create_temp_dir() | |
try: | |
temp_image = temp_dir / "temp.png" | |
read_file = Image.open(file) | |
read_file.save(str(temp_image)) | |
out_url = f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(str(temp_image))}' | |
return out_url | |
finally: | |
cleanup_temp_dir(temp_dir) | |
def rev_im(image): | |
"""Reverse image search for a single image.""" | |
if not image: | |
return gr.HTML("<p class='error-message'>No image provided</p>") | |
temp_dir = create_temp_dir() | |
try: | |
temp_image = temp_dir / "temp.png" | |
image_cv = cv2.imread(image) | |
cv2.imwrite(str(temp_image), image_cv) | |
out_url = f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(str(temp_image))}' | |
rev_img_searcher = ReverseImageSearcher() | |
results = rev_img_searcher.search(out_url) | |
return gr.HTML(build_results_html(results, len(results))) | |
except Exception as e: | |
logger.error(f"Error in reverse image search: {e}") | |
return gr.HTML(f'<p class="error-message">Error: {str(e)}</p>') | |
finally: | |
cleanup_temp_dir(temp_dir) | |
def build_results_html(results, count=None): | |
"""Build HTML output for search results.""" | |
html_out = f"<h1>Total Found: {count}</h1><br>" if count is not None else "" | |
for item in results: | |
html_out += f""" | |
<div class="result-item"> | |
<h3>{item.page_title}</h3> | |
<p>Source: <a href='{item.page_url}' target='_blank' rel='noopener noreferrer'>{item.page_url}</a></p> | |
<p>Image: <a href='{item.image_url}' target='_blank' rel='noopener noreferrer'>{item.image_url}</a></p> | |
<img class='my_im' src='{item.image_url}' alt='{item.page_title}'> | |
</div> | |
""" | |
return html_out | |
def create_ui(): | |
"""Create the Gradio interface.""" | |
with gr.Blocks(css=CUSTOM_CSS) as app: | |
with gr.Row(): | |
gr.Column() | |
with gr.Column(): | |
source_tog = gr.Radio( | |
choices=["Image", "Video"], | |
value="Image", | |
label="Search Type" | |
) | |
# Image search interface | |
with gr.Box(visible=True) as im_box: | |
inp_url = gr.Textbox(label="Image URL") | |
load_im_btn = gr.Button("Load Image", variant="primary") | |
inp_im = gr.Image(label="Search Image", type='filepath') | |
go_btn_im = gr.Button("Search", variant="primary") | |
# Video search interface | |
with gr.Box(visible=False) as vid_box: | |
vid_url = gr.Textbox(label="Video URL") | |
vid_url_btn = gr.Button("Load URL", variant="primary") | |
inp_vid = gr.Video(label="Search Video") | |
with gr.Row(): | |
every_n = gr.Number( | |
label="Every nth frame", | |
value=10, | |
minimum=1, | |
maximum=100, | |
step=1 | |
) | |
stat_box = gr.Textbox(label="Status", interactive=False) | |
with gr.Row(): | |
go_btn_vid = gr.Button("Start", variant="primary") | |
next_btn = gr.Button("Next Frame", variant="secondary") | |
gr.Column() | |
with gr.Row(): | |
html_out = gr.HTML() | |
with gr.Row(visible=False): | |
hid_box = gr.Textbox() | |
# Event handlers | |
def toggle_interface(tog): | |
"""Toggle between image and video interfaces.""" | |
return ( | |
gr.update(visible=tog == "Image"), | |
gr.update(visible=tog == "Video") | |
) | |
# Connect all event handlers | |
source_tog.change( | |
toggle_interface, | |
[source_tog], | |
[im_box, vid_box], | |
cancels=[go_btn_vid.click, go_btn_im.click, load_im_btn.click, vid_url_btn.click] | |
) | |
load_im_btn.click(lambda x: x, inp_url, inp_im) | |
next_btn.click(process_vid, [inp_vid, hid_box, every_n], [html_out, stat_box, hid_box]) | |
vid_url_btn.click(dl, vid_url, [inp_vid, html_out, stat_box, hid_box]) | |
go_btn_vid.click(process_vid, [inp_vid, hid_box, every_n], [html_out, stat_box, hid_box]) | |
go_btn_im.click(rev_im, inp_im, html_out) | |
return app | |
if __name__ == "__main__": | |
app = create_ui() | |
app.queue(concurrency_count=20).launch() |