import gradio as gr
import requests
import yt_dlp
import cv2
from google_img_source_search import ReverseImageSearcher
from PIL import Image
import os
import uuid
from pathlib import Path
import shutil
import logging
import sys
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Custom CSS for better presentation
CUSTOM_CSS = """
.result-item {
margin: 20px 0;
padding: 15px;
border: 1px solid #ddd;
border-radius: 5px;
}
.result-item img {
max-width: 100%;
height: auto;
margin: 10px 0;
}
.error-message {
color: red;
padding: 10px;
border: 1px solid red;
border-radius: 5px;
margin: 10px 0;
}
"""
def create_temp_dir():
"""Create a temporary directory with UUID."""
uid = str(uuid.uuid4())
temp_dir = Path(f"temp_{uid}")
temp_dir.mkdir(exist_ok=True)
return temp_dir
def cleanup_temp_dir(temp_dir):
"""Safely clean up temporary directory and its contents."""
try:
if temp_dir.exists():
shutil.rmtree(str(temp_dir))
except Exception as e:
logger.error(f"Error cleaning up temporary directory: {e}")
def dl(inp):
"""Download video from URL using yt-dlp."""
if not inp or not inp.strip():
return None, gr.HTML("
Please provide a valid URL
"), "", ""
temp_dir = create_temp_dir()
try:
# Sanitize input filename
inp_out = inp.replace("https://", "").replace("/", "_").replace(".", "_").replace("=", "_").replace("?", "_")
output_path = str(temp_dir / f"{inp_out}.mp4")
# Configure yt-dlp options
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
'outtmpl': output_path,
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'merge_output_format': 'mp4'
}
# Handle Twitter URLs differently
if "twitter" in inp.lower():
ydl_opts['extractor_args'] = {'twitter': {'api': ['syndication']}}
# Download the video
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
logger.info(f"Downloading video from: {inp}")
ydl.download([inp])
if os.path.exists(output_path):
logger.info(f"Successfully downloaded to: {output_path}")
return output_path, gr.HTML(""), "", ""
else:
raise Exception("Download completed but file not found")
except Exception as e:
logger.error(f"Error downloading video: {e}")
cleanup_temp_dir(temp_dir)
return None, gr.HTML(f"Error: {str(e)}
"), "", ""
def process_vid(file, cur_frame, every_n):
"""Process video file and search for frames."""
if not file:
return gr.HTML("No video file provided
"), "", ""
temp_dir = create_temp_dir()
capture = None
try:
capture = cv2.VideoCapture(str(file))
if not capture.isOpened():
raise Exception("Failed to open video file")
frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
rev_img_searcher = ReverseImageSearcher()
html_out = ""
count = int(every_n)
start_frame = int(cur_frame) if cur_frame and cur_frame.strip() else 0
for i in range(start_frame, frame_count-1):
if count == int(every_n):
count = 1
logger.info(f"Processing frame {i}")
capture.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = capture.read()
if not ret:
continue
temp_image = temp_dir / f"frame_{i}.png"
cv2.imwrite(str(temp_image), frame)
# Generate the URL for the temporary image
image_path = os.path.abspath(str(temp_image))
image_url = f'https://nymbo-reverse-image.hf.space/file={image_path}'
try:
results = rev_img_searcher.search(image_url)
if results:
out_cnt = len(results)
html_out = build_results_html(results)
return gr.HTML(f'Total Found: {out_cnt}
{html_out}'), f"Found frame: {i}", i+int(every_n)
except Exception as search_error:
logger.error(f"Search error on frame {i}: {search_error}")
count += 1
except Exception as e:
logger.error(f"Error processing video: {e}")
return gr.HTML(f'Error: {str(e)}
'), "", ""
finally:
if capture is not None:
capture.release()
cleanup_temp_dir(temp_dir)
return gr.HTML('No frame matches found.'), "", ""
def process_im(file, url):
"""Process image file or URL."""
if not url.startswith("https://nymbo"):
return url
temp_dir = create_temp_dir()
try:
temp_image = temp_dir / "temp.png"
read_file = Image.open(file)
read_file.save(str(temp_image))
out_url = f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(str(temp_image))}'
return out_url
finally:
cleanup_temp_dir(temp_dir)
def rev_im(image):
"""Reverse image search for a single image."""
if not image:
return gr.HTML("No image provided
")
temp_dir = create_temp_dir()
try:
temp_image = temp_dir / "temp.png"
image_cv = cv2.imread(image)
cv2.imwrite(str(temp_image), image_cv)
out_url = f'https://nymbo-reverse-image.hf.space/file={os.path.abspath(str(temp_image))}'
rev_img_searcher = ReverseImageSearcher()
results = rev_img_searcher.search(out_url)
return gr.HTML(build_results_html(results, len(results)))
except Exception as e:
logger.error(f"Error in reverse image search: {e}")
return gr.HTML(f'Error: {str(e)}
')
finally:
cleanup_temp_dir(temp_dir)
def build_results_html(results, count=None):
"""Build HTML output for search results."""
html_out = f"Total Found: {count}
" if count is not None else ""
for item in results:
html_out += f"""
"""
return html_out
def create_ui():
"""Create the Gradio interface."""
with gr.Blocks(css=CUSTOM_CSS) as app:
with gr.Row():
gr.Column()
with gr.Column():
source_tog = gr.Radio(
choices=["Image", "Video"],
value="Image",
label="Search Type"
)
# Image search interface
with gr.Box(visible=True) as im_box:
inp_url = gr.Textbox(label="Image URL")
load_im_btn = gr.Button("Load Image", variant="primary")
inp_im = gr.Image(label="Search Image", type='filepath')
go_btn_im = gr.Button("Search", variant="primary")
# Video search interface
with gr.Box(visible=False) as vid_box:
vid_url = gr.Textbox(label="Video URL")
vid_url_btn = gr.Button("Load URL", variant="primary")
inp_vid = gr.Video(label="Search Video")
with gr.Row():
every_n = gr.Number(
label="Every nth frame",
value=10,
minimum=1,
maximum=100,
step=1
)
stat_box = gr.Textbox(label="Status", interactive=False)
with gr.Row():
go_btn_vid = gr.Button("Start", variant="primary")
next_btn = gr.Button("Next Frame", variant="secondary")
gr.Column()
with gr.Row():
html_out = gr.HTML()
with gr.Row(visible=False):
hid_box = gr.Textbox()
# Event handlers
def toggle_interface(tog):
"""Toggle between image and video interfaces."""
return (
gr.update(visible=tog == "Image"),
gr.update(visible=tog == "Video")
)
# Connect all event handlers
source_tog.change(
toggle_interface,
[source_tog],
[im_box, vid_box],
cancels=[go_btn_vid.click, go_btn_im.click, load_im_btn.click, vid_url_btn.click]
)
load_im_btn.click(lambda x: x, inp_url, inp_im)
next_btn.click(process_vid, [inp_vid, hid_box, every_n], [html_out, stat_box, hid_box])
vid_url_btn.click(dl, vid_url, [inp_vid, html_out, stat_box, hid_box])
go_btn_vid.click(process_vid, [inp_vid, hid_box, every_n], [html_out, stat_box, hid_box])
go_btn_im.click(rev_im, inp_im, html_out)
return app
if __name__ == "__main__":
app = create_ui()
app.queue(concurrency_count=20).launch()