Spaces:
Sleeping
Sleeping
JuanjoSG5
commited on
Commit
·
7af9a4a
1
Parent(s):
7325704
feat: imported the project to the org
Browse files- .gitignore +3 -0
- README.md +10 -7
- app.py +393 -0
- gradio_interface/app.py +204 -0
- requirements.txt +8 -0
- src/utils/add_text.py +62 -0
- src/utils/apply_filter.py +129 -0
- src/utils/change_format.py +45 -0
- src/utils/compress.py +59 -0
- src/utils/describe.py +112 -0
- src/utils/generate_image.py +79 -0
- src/utils/remove_background.py +100 -0
- src/utils/resize_image.py +56 -0
- src/utils/visualize_image.py +18 -0
- src/utils/watermark.py +94 -0
.gitignore
ADDED
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
1 |
+
__pycache__/
|
2 |
+
.env
|
3 |
+
test_agent.py
|
README.md
CHANGED
@@ -1,13 +1,16 @@
|
|
1 |
---
|
2 |
-
title:
|
3 |
-
emoji:
|
4 |
-
colorFrom:
|
5 |
-
colorTo:
|
6 |
sdk: gradio
|
7 |
-
sdk_version: 5.
|
8 |
app_file: app.py
|
9 |
pinned: false
|
10 |
-
short_description:
|
|
|
11 |
---
|
12 |
|
13 |
-
|
|
|
|
|
|
1 |
---
|
2 |
+
title: ImageUtilitiesMCP
|
3 |
+
emoji: 🖼️
|
4 |
+
colorFrom: pink
|
5 |
+
colorTo: red
|
6 |
sdk: gradio
|
7 |
+
sdk_version: 5.32.0
|
8 |
app_file: app.py
|
9 |
pinned: false
|
10 |
+
short_description: Useful tools for image editing.
|
11 |
+
tags: [mcp-server-track]
|
12 |
---
|
13 |
|
14 |
+
This project has been created by: [RafaelJaime](https://huggingface.co/RafaelJaime), [ItzRoBeerT](https://huggingface.co/ItzRoBeerT) and [JuanjoJ55](https://huggingface.co/JuanjoJ55).
|
15 |
+
|
16 |
+
The original space with the all the commits can be found here: [Original Space](https://huggingface.co/spaces/RafaelJaime/image_utilities_mcp)
|
app.py
ADDED
@@ -0,0 +1,393 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import gradio as gr
|
2 |
+
from src.utils.change_format import change_format
|
3 |
+
from src.utils.remove_background import remove_background
|
4 |
+
from src.utils.generate_image import generate_image
|
5 |
+
from src.utils.add_text import add_text_to_image_base64
|
6 |
+
from src.utils.compress import compress_image_memory
|
7 |
+
from src.utils.generate_image import generate_image
|
8 |
+
from src.utils.apply_filter import apply_filter_direct
|
9 |
+
from src.utils.watermark import add_watermark, remove_watermark
|
10 |
+
from src.utils.describe import describe_image
|
11 |
+
import base64
|
12 |
+
from PIL import Image
|
13 |
+
import io
|
14 |
+
import requests
|
15 |
+
from io import BytesIO
|
16 |
+
from typing import Union
|
17 |
+
|
18 |
+
def change_format(image: Union[str, BytesIO], target_format: str) -> str:
|
19 |
+
"""
|
20 |
+
Change the format of an image from a URL to the specified target format.
|
21 |
+
"""
|
22 |
+
|
23 |
+
if not isinstance(image, BytesIO):
|
24 |
+
response = requests.get(image, timeout=30)
|
25 |
+
response.raise_for_status()
|
26 |
+
img = Image.open(BytesIO(response.content))
|
27 |
+
else:
|
28 |
+
img = Image.open(image)
|
29 |
+
|
30 |
+
output = BytesIO()
|
31 |
+
img.save(output, format=target_format)
|
32 |
+
output.seek(0)
|
33 |
+
|
34 |
+
encoded_image = base64.b64encode(output.getvalue()).decode('utf-8')
|
35 |
+
|
36 |
+
return encoded_image
|
37 |
+
|
38 |
+
def image_to_base64(image):
|
39 |
+
if image is None:
|
40 |
+
return None
|
41 |
+
buffer = io.BytesIO()
|
42 |
+
image.save(buffer, format="PNG")
|
43 |
+
return base64.b64encode(buffer.getvalue()).decode()
|
44 |
+
|
45 |
+
def base64_to_image(base64_str):
|
46 |
+
if not base64_str:
|
47 |
+
return None
|
48 |
+
|
49 |
+
# Remove data URI prefix if present (e.g., "data:image/png;base64,")
|
50 |
+
if isinstance(base64_str, str) and "base64," in base64_str:
|
51 |
+
base64_str = base64_str.split("base64,", 1)[1]
|
52 |
+
|
53 |
+
try:
|
54 |
+
# Strip any whitespace that might be in the base64 string
|
55 |
+
if isinstance(base64_str, str):
|
56 |
+
base64_str = base64_str.strip()
|
57 |
+
|
58 |
+
# Decode the base64 data
|
59 |
+
image_data = base64.b64decode(base64_str)
|
60 |
+
|
61 |
+
# Check if we have data
|
62 |
+
if not image_data:
|
63 |
+
print("Decoded base64 data is empty")
|
64 |
+
return None
|
65 |
+
|
66 |
+
# Attempt to open the image
|
67 |
+
image = Image.open(io.BytesIO(image_data))
|
68 |
+
|
69 |
+
# Convert the image to ensure it's valid
|
70 |
+
return image.copy()
|
71 |
+
|
72 |
+
except base64.binascii.Error as e:
|
73 |
+
print(f"Base64 decoding error: {str(e)}")
|
74 |
+
if isinstance(base64_str, str):
|
75 |
+
preview = base64_str[:30] + "..." if len(base64_str) > 30 else base64_str
|
76 |
+
print(f"Base64 preview: {preview}")
|
77 |
+
return None
|
78 |
+
|
79 |
+
except Exception as e:
|
80 |
+
print(f"Error converting base64 to image: {str(e)}")
|
81 |
+
|
82 |
+
# Print preview of the base64 string for debugging
|
83 |
+
if isinstance(base64_str, str):
|
84 |
+
preview = base64_str[:30] + "..." if len(base64_str) > 30 else base64_str
|
85 |
+
print(f"Base64 preview: {preview}")
|
86 |
+
|
87 |
+
# Additional debug information
|
88 |
+
if 'image_data' in locals() and image_data:
|
89 |
+
try:
|
90 |
+
magic_bytes = image_data[:12].hex()
|
91 |
+
print(f"First 12 bytes: {magic_bytes}")
|
92 |
+
except:
|
93 |
+
pass
|
94 |
+
|
95 |
+
return None
|
96 |
+
|
97 |
+
def url_to_base64(url):
|
98 |
+
response = requests.get(url)
|
99 |
+
return base64.b64encode(response.content).decode()
|
100 |
+
|
101 |
+
def gradio_remove_background(image):
|
102 |
+
if image is None:
|
103 |
+
return None
|
104 |
+
base64_img = image_to_base64(image)
|
105 |
+
result = remove_background(f"data:image/png;base64,{base64_img}")
|
106 |
+
|
107 |
+
# Check if the result is directly a base64 string or has an image_data key
|
108 |
+
if isinstance(result, str):
|
109 |
+
return base64_to_image(result)
|
110 |
+
elif isinstance(result, dict) and "image_data" in result:
|
111 |
+
# If image_data contains a data URI prefix
|
112 |
+
if isinstance(result["image_data"], str) and result["image_data"].startswith("data:"):
|
113 |
+
# The response already contains the full data URI
|
114 |
+
return base64_to_image(result["image_data"])
|
115 |
+
else:
|
116 |
+
# Try to process it as a regular base64 string
|
117 |
+
try:
|
118 |
+
return base64_to_image(result["image_data"])
|
119 |
+
except Exception as e:
|
120 |
+
print(f"Error processing image data: {e}")
|
121 |
+
return None
|
122 |
+
else:
|
123 |
+
print(f"Unexpected response format from remove_background: {type(result)}")
|
124 |
+
return None
|
125 |
+
|
126 |
+
def gradio_describe_image(image):
|
127 |
+
if image is None:
|
128 |
+
return "No image provided"
|
129 |
+
try:
|
130 |
+
base64_img = image_to_base64(image)
|
131 |
+
return describe_image(base64_img)
|
132 |
+
except Exception as e:
|
133 |
+
print(f"Error describing image: {e}")
|
134 |
+
return f"Error: {str(e)}"
|
135 |
+
|
136 |
+
def gradio_change_format(image, format_type):
|
137 |
+
if image is None:
|
138 |
+
return None
|
139 |
+
try:
|
140 |
+
base64_img = image_to_base64(image)
|
141 |
+
result = change_format(base64_img, format_type)
|
142 |
+
return base64_to_image(result)
|
143 |
+
except Exception as e:
|
144 |
+
print(f"Error changing format: {e}")
|
145 |
+
return image
|
146 |
+
|
147 |
+
def gradio_generate_image(prompt, width=512, height=512):
|
148 |
+
result = generate_image(prompt, width, height)
|
149 |
+
return base64_to_image(result["b64"])
|
150 |
+
|
151 |
+
def gradio_apply_filter(image, filter_type, intensity=1.0):
|
152 |
+
if image is None:
|
153 |
+
print("No image provided")
|
154 |
+
return None
|
155 |
+
|
156 |
+
return apply_filter_direct(image, filter_type, intensity)
|
157 |
+
|
158 |
+
def update_text_image(image, text, centered, x, y, font_size, color):
|
159 |
+
if image is None:
|
160 |
+
return None
|
161 |
+
if not text or text.strip() == "":
|
162 |
+
return image
|
163 |
+
|
164 |
+
result = add_text_to_image_base64(image, text, int(x), int(y), int(font_size), color, centered)
|
165 |
+
return result
|
166 |
+
|
167 |
+
def toggle_position_fields(centered):
|
168 |
+
return (
|
169 |
+
gr.Number(interactive=not centered),
|
170 |
+
gr.Number(interactive=not centered)
|
171 |
+
)
|
172 |
+
|
173 |
+
def toggle_intensity_slider(filter_type):
|
174 |
+
intensity_filters = ['blur', 'brightness', 'contrast', 'saturation']
|
175 |
+
return gr.Slider(interactive=filter_type in intensity_filters)
|
176 |
+
|
177 |
+
def gradio_add_watermark(image, watermark_text, opacity=0.5):
|
178 |
+
if image is None:
|
179 |
+
return None
|
180 |
+
try:
|
181 |
+
base64_img = image_to_base64(image)
|
182 |
+
result = add_watermark(base64_img, watermark_text, opacity)
|
183 |
+
return base64_to_image(result)
|
184 |
+
except Exception as e:
|
185 |
+
print(f"Error adding watermark: {e}")
|
186 |
+
return image
|
187 |
+
|
188 |
+
def gradio_remove_watermark(image):
|
189 |
+
if image is None:
|
190 |
+
return None
|
191 |
+
try:
|
192 |
+
base64_img = image_to_base64(image)
|
193 |
+
result = remove_watermark(base64_img)
|
194 |
+
return base64_to_image(result)
|
195 |
+
except Exception as e:
|
196 |
+
print(f"Error removing watermark: {e}")
|
197 |
+
return image
|
198 |
+
|
199 |
+
def gradio_compress_image(image, quality=80):
|
200 |
+
"""
|
201 |
+
Compress image for Gradio interface
|
202 |
+
"""
|
203 |
+
if image is None:
|
204 |
+
return None
|
205 |
+
try:
|
206 |
+
compressed_image = compress_image_memory(image, quality, "JPEG")
|
207 |
+
return compressed_image
|
208 |
+
except Exception as e:
|
209 |
+
print(f"Error compressing image: {e}")
|
210 |
+
return image
|
211 |
+
|
212 |
+
def create_gradio_interface():
|
213 |
+
with gr.Blocks(title="ImageUtilitiesMCP", theme=gr.themes.Soft()) as demo:
|
214 |
+
gr.Markdown("# 🖼️ ImageUtilitiesMCP")
|
215 |
+
gr.Markdown("Complete processing image tools")
|
216 |
+
|
217 |
+
with gr.Tabs():
|
218 |
+
with gr.Tab("🎨 Generate Image"):
|
219 |
+
with gr.Row():
|
220 |
+
prompt_input = gr.Textbox(label="Prompt", placeholder="Describe the image you want to generate")
|
221 |
+
with gr.Column():
|
222 |
+
width_input = gr.Slider(256, 1024, 512, label="Width")
|
223 |
+
height_input = gr.Slider(256, 1024, 512, label="Height")
|
224 |
+
generate_btn = gr.Button("Generate", variant="primary")
|
225 |
+
generated_output = gr.Image(label="Generated Image")
|
226 |
+
|
227 |
+
generate_btn.click(
|
228 |
+
gradio_generate_image,
|
229 |
+
[prompt_input, width_input, height_input],
|
230 |
+
generated_output
|
231 |
+
)
|
232 |
+
|
233 |
+
with gr.Tab("🔍 Describe Image"):
|
234 |
+
with gr.Row():
|
235 |
+
describe_input = gr.Image(label="Upload Image", type="pil")
|
236 |
+
description_output = gr.Textbox(label="Description", lines=4)
|
237 |
+
|
238 |
+
describe_input.change(gradio_describe_image, describe_input, description_output)
|
239 |
+
|
240 |
+
with gr.Tab("✂️ Remove Background"):
|
241 |
+
with gr.Row():
|
242 |
+
bg_input = gr.Image(label="Upload Image", type="pil")
|
243 |
+
bg_output = gr.Image(label="Background Removed")
|
244 |
+
|
245 |
+
bg_input.change(gradio_remove_background, bg_input, bg_output)
|
246 |
+
|
247 |
+
with gr.Tab("🎭 Apply Filters"):
|
248 |
+
with gr.Row():
|
249 |
+
filter_input = gr.Image(label="Upload Image", type="pil")
|
250 |
+
with gr.Column():
|
251 |
+
filter_type = gr.Dropdown(
|
252 |
+
["blur", "sharpen", "vintage", "black_white", "sepia", "emboss", "edge", "smooth", "brightness", "contrast", "saturation", "grayscale"],
|
253 |
+
label="Filter Type",
|
254 |
+
value="blur"
|
255 |
+
)
|
256 |
+
|
257 |
+
intensity_slider = gr.Slider(
|
258 |
+
minimum=0.1,
|
259 |
+
maximum=300.0,
|
260 |
+
value=1.0,
|
261 |
+
step=0.1,
|
262 |
+
label="Intensity",
|
263 |
+
interactive=True
|
264 |
+
)
|
265 |
+
|
266 |
+
filter_output = gr.Image(label="Filtered Image")
|
267 |
+
|
268 |
+
filter_type.change(
|
269 |
+
toggle_intensity_slider,
|
270 |
+
filter_type,
|
271 |
+
intensity_slider
|
272 |
+
)
|
273 |
+
|
274 |
+
filter_inputs = [filter_input, filter_type, intensity_slider]
|
275 |
+
|
276 |
+
for inp in filter_inputs:
|
277 |
+
inp.change(gradio_apply_filter, filter_inputs, filter_output)
|
278 |
+
|
279 |
+
with gr.Tab("📝 Add Text"):
|
280 |
+
with gr.Row():
|
281 |
+
text_input = gr.Image(label="Upload Image", type="pil")
|
282 |
+
with gr.Column():
|
283 |
+
text_content = gr.Textbox(
|
284 |
+
label="Text",
|
285 |
+
placeholder="Enter text to add",
|
286 |
+
value=""
|
287 |
+
)
|
288 |
+
text_centered = gr.Checkbox(label="Center Text", value=False)
|
289 |
+
|
290 |
+
with gr.Row():
|
291 |
+
text_x = gr.Number(
|
292 |
+
label="X Position",
|
293 |
+
value=50,
|
294 |
+
interactive=True,
|
295 |
+
minimum=0
|
296 |
+
)
|
297 |
+
text_y = gr.Number(
|
298 |
+
label="Y Position",
|
299 |
+
value=50,
|
300 |
+
interactive=True,
|
301 |
+
minimum=0
|
302 |
+
)
|
303 |
+
|
304 |
+
with gr.Row():
|
305 |
+
font_size = gr.Slider(
|
306 |
+
minimum=10,
|
307 |
+
maximum=100,
|
308 |
+
value=20,
|
309 |
+
label="Font Size"
|
310 |
+
)
|
311 |
+
text_color = gr.ColorPicker(
|
312 |
+
label="Color",
|
313 |
+
value="#FFFFFF"
|
314 |
+
)
|
315 |
+
|
316 |
+
add_text_btn = gr.Button("Add Text", variant="primary")
|
317 |
+
text_output = gr.Image(label="Image with Text")
|
318 |
+
|
319 |
+
text_centered.change(
|
320 |
+
toggle_position_fields,
|
321 |
+
text_centered,
|
322 |
+
[text_x, text_y]
|
323 |
+
)
|
324 |
+
|
325 |
+
inputs = [text_input, text_content, text_centered, text_x, text_y, font_size, text_color]
|
326 |
+
|
327 |
+
add_text_btn.click(
|
328 |
+
update_text_image,
|
329 |
+
inputs,
|
330 |
+
text_output
|
331 |
+
)
|
332 |
+
|
333 |
+
for inp in inputs:
|
334 |
+
inp.change(update_text_image, inputs, text_output)
|
335 |
+
|
336 |
+
with gr.Tab("💧 Watermark"):
|
337 |
+
with gr.Tabs():
|
338 |
+
with gr.Tab("Add Watermark"):
|
339 |
+
with gr.Row():
|
340 |
+
watermark_input = gr.Image(label="Upload Image", type="pil")
|
341 |
+
with gr.Column():
|
342 |
+
watermark_text = gr.Textbox(label="Watermark Text")
|
343 |
+
watermark_opacity = gr.Slider(0.1, 1.0, 0.5, label="Opacity")
|
344 |
+
watermark_output = gr.Image(label="Watermarked Image")
|
345 |
+
|
346 |
+
inputs = [watermark_input, watermark_text, watermark_opacity]
|
347 |
+
for inp in inputs:
|
348 |
+
inp.change(gradio_add_watermark, inputs, watermark_output)
|
349 |
+
|
350 |
+
with gr.Tab("Remove Watermark"):
|
351 |
+
with gr.Row():
|
352 |
+
unwatermark_input = gr.Image(label="Upload Image", type="pil")
|
353 |
+
unwatermark_output = gr.Image(label="Watermark Removed")
|
354 |
+
|
355 |
+
unwatermark_input.change(gradio_remove_watermark, unwatermark_input, unwatermark_output)
|
356 |
+
|
357 |
+
with gr.Tab("🗜️ Compress"):
|
358 |
+
with gr.Row():
|
359 |
+
compress_input = gr.Image(label="Upload Image", type="pil")
|
360 |
+
with gr.Column():
|
361 |
+
quality_slider = gr.Slider(0, 100, 80, label="Quality %")
|
362 |
+
compress_output = gr.Image(label="Compressed Image")
|
363 |
+
|
364 |
+
compress_input.change(gradio_compress_image, [compress_input, quality_slider], compress_output)
|
365 |
+
quality_slider.change(gradio_compress_image, [compress_input, quality_slider], compress_output)
|
366 |
+
|
367 |
+
with gr.Tab("🔄 Change Format"):
|
368 |
+
with gr.Row():
|
369 |
+
format_input = gr.Image(label="Upload Image", type="pil")
|
370 |
+
with gr.Column():
|
371 |
+
format_type = gr.Dropdown(
|
372 |
+
["PNG", "JPEG", "WEBP", "BMP"],
|
373 |
+
label="Output Format",
|
374 |
+
value="PNG"
|
375 |
+
)
|
376 |
+
format_output = gr.Image(label="Converted Image")
|
377 |
+
|
378 |
+
format_input.change(gradio_change_format, [format_input, format_type], format_output)
|
379 |
+
format_type.change(gradio_change_format, [format_input, format_type], format_output)
|
380 |
+
|
381 |
+
gr.Markdown("---")
|
382 |
+
gr.Markdown("💡 **Status**: Active | Procesamiento de imágenes en tiempo real")
|
383 |
+
|
384 |
+
return demo
|
385 |
+
|
386 |
+
if __name__ == "__main__":
|
387 |
+
demo = create_gradio_interface()
|
388 |
+
demo.launch(
|
389 |
+
mcp_server=True,
|
390 |
+
server_name="0.0.0.0",
|
391 |
+
server_port=7860,
|
392 |
+
show_error=True
|
393 |
+
)
|
gradio_interface/app.py
ADDED
@@ -0,0 +1,204 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
import gradio as gr
|
3 |
+
from os import getenv
|
4 |
+
import base64
|
5 |
+
from io import BytesIO
|
6 |
+
from dotenv import load_dotenv
|
7 |
+
import requests
|
8 |
+
import socket
|
9 |
+
import logging
|
10 |
+
import json
|
11 |
+
|
12 |
+
from langchain_openai import ChatOpenAI
|
13 |
+
from langchain_core.messages import HumanMessage, AIMessage
|
14 |
+
from langchain_core.callbacks import StreamingStdOutCallbackHandler
|
15 |
+
|
16 |
+
# Load environment
|
17 |
+
dotenv_path = os.path.join(os.path.dirname(__file__), '.env')
|
18 |
+
load_dotenv(dotenv_path=dotenv_path)
|
19 |
+
|
20 |
+
# Connectivity test
|
21 |
+
def test_connectivity(url="https://openrouter.helicone.ai/api/v1"):
|
22 |
+
try:
|
23 |
+
return requests.get(url, timeout=5).status_code == 200
|
24 |
+
except (requests.RequestException, socket.error):
|
25 |
+
return False
|
26 |
+
|
27 |
+
# Helper to make direct API calls to OpenRouter when LangChain fails
|
28 |
+
def direct_api_call(messages, api_key, base_url):
|
29 |
+
headers = {
|
30 |
+
"Content-Type": "application/json",
|
31 |
+
"Authorization": f"Bearer {api_key}",
|
32 |
+
"HTTP-Referer": "https://your-app-domain.com", # Add your domain
|
33 |
+
"X-Title": "Image Analysis App"
|
34 |
+
}
|
35 |
+
|
36 |
+
if getenv("HELICONE_API_KEY"):
|
37 |
+
headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}"
|
38 |
+
|
39 |
+
payload = {
|
40 |
+
"model": "google/gemini-flash-1.5",
|
41 |
+
"messages": messages,
|
42 |
+
"stream": False,
|
43 |
+
}
|
44 |
+
|
45 |
+
try:
|
46 |
+
response = requests.post(
|
47 |
+
f"{base_url}/chat/completions",
|
48 |
+
headers=headers,
|
49 |
+
json=payload,
|
50 |
+
timeout=30
|
51 |
+
)
|
52 |
+
response.raise_for_status()
|
53 |
+
return response.json()["choices"][0]["message"]["content"]
|
54 |
+
except Exception as e:
|
55 |
+
return f"Error: {str(e)}"
|
56 |
+
|
57 |
+
# Initialize LLM with streaming and retry logic
|
58 |
+
def init_llm():
|
59 |
+
if not test_connectivity():
|
60 |
+
raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.")
|
61 |
+
return ChatOpenAI(
|
62 |
+
openai_api_key=getenv("OPENROUTER_API_KEY"),
|
63 |
+
openai_api_base=getenv("OPENROUTER_BASE_URL"),
|
64 |
+
model_name="google/gemini-flash-1.5",
|
65 |
+
streaming=True,
|
66 |
+
callbacks=[StreamingStdOutCallbackHandler()],
|
67 |
+
model_kwargs={
|
68 |
+
"extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"}
|
69 |
+
},
|
70 |
+
)
|
71 |
+
|
72 |
+
# Try to initialize LLM but handle failures gracefully
|
73 |
+
try:
|
74 |
+
llm = init_llm()
|
75 |
+
except Exception as e:
|
76 |
+
llm = None
|
77 |
+
|
78 |
+
# Helpers
|
79 |
+
def encode_image_to_base64(pil_image):
|
80 |
+
buffer = BytesIO()
|
81 |
+
pil_image.save(buffer, format="PNG")
|
82 |
+
return base64.b64encode(buffer.getvalue()).decode()
|
83 |
+
|
84 |
+
# Core logic
|
85 |
+
def generate_response(message, chat_history, image):
|
86 |
+
# Convert chat history to standard format
|
87 |
+
formatted_history = []
|
88 |
+
for msg in chat_history:
|
89 |
+
role = msg.get('role')
|
90 |
+
content = msg.get('content')
|
91 |
+
if role == 'user':
|
92 |
+
formatted_history.append({"role": "user", "content": content})
|
93 |
+
else:
|
94 |
+
formatted_history.append({"role": "assistant", "content": content})
|
95 |
+
|
96 |
+
# Prepare system message
|
97 |
+
system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."}
|
98 |
+
|
99 |
+
# Prepare the latest message with image if provided
|
100 |
+
if image:
|
101 |
+
base64_image = encode_image_to_base64(image)
|
102 |
+
|
103 |
+
# Format for direct API call (OpenRouter/OpenAI format)
|
104 |
+
api_messages = [system_msg] + formatted_history + [{
|
105 |
+
"role": "user",
|
106 |
+
"content": [
|
107 |
+
{"type": "text", "text": message},
|
108 |
+
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
|
109 |
+
]
|
110 |
+
}]
|
111 |
+
|
112 |
+
# For LangChain format
|
113 |
+
content_for_langchain = [
|
114 |
+
{"type": "text", "text": message},
|
115 |
+
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
|
116 |
+
]
|
117 |
+
else:
|
118 |
+
api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}]
|
119 |
+
content_for_langchain = message
|
120 |
+
|
121 |
+
# Build LangChain messages
|
122 |
+
lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")]
|
123 |
+
for msg in chat_history:
|
124 |
+
role = msg.get('role')
|
125 |
+
content = msg.get('content')
|
126 |
+
if role == 'user':
|
127 |
+
lc_messages.append(HumanMessage(content=content))
|
128 |
+
else:
|
129 |
+
lc_messages.append(AIMessage(content=content))
|
130 |
+
|
131 |
+
lc_messages.append(HumanMessage(content=content_for_langchain))
|
132 |
+
|
133 |
+
try:
|
134 |
+
# First try with LangChain
|
135 |
+
if llm:
|
136 |
+
try:
|
137 |
+
try:
|
138 |
+
stream_iter = llm.stream(lc_messages)
|
139 |
+
partial = ""
|
140 |
+
for chunk in stream_iter:
|
141 |
+
if chunk is None:
|
142 |
+
continue
|
143 |
+
content = getattr(chunk, 'content', None)
|
144 |
+
if content is None:
|
145 |
+
continue
|
146 |
+
partial += content
|
147 |
+
yield partial
|
148 |
+
|
149 |
+
# If we got this far, streaming worked
|
150 |
+
return
|
151 |
+
except Exception as e:
|
152 |
+
print(f"Streaming failed: {e}. Falling back to non-streaming mode")
|
153 |
+
|
154 |
+
# Try non-streaming
|
155 |
+
try:
|
156 |
+
response = llm.invoke(lc_messages)
|
157 |
+
yield response.content
|
158 |
+
return
|
159 |
+
except Exception as e:
|
160 |
+
raise e
|
161 |
+
except Exception as e:
|
162 |
+
raise e
|
163 |
+
|
164 |
+
response_text = direct_api_call(
|
165 |
+
api_messages,
|
166 |
+
getenv("OPENROUTER_API_KEY"),
|
167 |
+
getenv("OPENROUTER_BASE_URL")
|
168 |
+
)
|
169 |
+
yield response_text
|
170 |
+
|
171 |
+
except Exception as e:
|
172 |
+
import traceback
|
173 |
+
error_trace = traceback.format_exc()
|
174 |
+
yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde."
|
175 |
+
|
176 |
+
# Gradio interface
|
177 |
+
def process_message(message, chat_history, image):
|
178 |
+
if chat_history is None:
|
179 |
+
chat_history = []
|
180 |
+
if image is None:
|
181 |
+
chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'})
|
182 |
+
return "", chat_history
|
183 |
+
chat_history.append({'role':'user','content':message})
|
184 |
+
chat_history.append({'role':'assistant','content':'⏳ Procesando...'})
|
185 |
+
yield "", chat_history
|
186 |
+
for chunk in generate_response(message, chat_history, image):
|
187 |
+
chat_history[-1]['content'] = chunk
|
188 |
+
yield "", chat_history
|
189 |
+
return "", chat_history
|
190 |
+
|
191 |
+
with gr.Blocks() as demo:
|
192 |
+
with gr.Row():
|
193 |
+
with gr.Column(scale=2):
|
194 |
+
chatbot = gr.Chatbot(type='messages', height=600)
|
195 |
+
msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...")
|
196 |
+
clear = gr.ClearButton([msg, chatbot])
|
197 |
+
with gr.Column(scale=1):
|
198 |
+
image_input = gr.Image(type="pil", label="Sube Imagen")
|
199 |
+
info = gr.Textbox(label="Info Imagen", interactive=False)
|
200 |
+
|
201 |
+
msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot])
|
202 |
+
image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info])
|
203 |
+
|
204 |
+
demo.launch()
|
requirements.txt
ADDED
@@ -0,0 +1,8 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
fastmcp
|
2 |
+
requests
|
3 |
+
Pillow
|
4 |
+
rembg
|
5 |
+
onnxruntime
|
6 |
+
openai
|
7 |
+
opencv-python
|
8 |
+
langchain_openai
|
src/utils/add_text.py
ADDED
@@ -0,0 +1,62 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image, ImageDraw, ImageFont
|
2 |
+
import os
|
3 |
+
from typing import Optional, Tuple, Dict, Any
|
4 |
+
|
5 |
+
def parse_color(color_str):
|
6 |
+
if color_str.startswith('rgba('):
|
7 |
+
values = color_str[5:-1].split(',')
|
8 |
+
r = int(float(values[0]))
|
9 |
+
g = int(float(values[1]))
|
10 |
+
b = int(float(values[2]))
|
11 |
+
return (r, g, b)
|
12 |
+
elif color_str.startswith('rgb('):
|
13 |
+
values = color_str[4:-1].split(',')
|
14 |
+
r = int(float(values[0]))
|
15 |
+
g = int(float(values[1]))
|
16 |
+
b = int(float(values[2]))
|
17 |
+
return (r, g, b)
|
18 |
+
elif color_str.startswith('#'):
|
19 |
+
return color_str
|
20 |
+
else:
|
21 |
+
return color_str
|
22 |
+
|
23 |
+
def add_text_to_image_base64(image, text, x, y, font_size, color, centered=False):
|
24 |
+
"""
|
25 |
+
Adds centered text to an image and saves the result in the same folder.
|
26 |
+
If no output_name is provided, '_edited' is appended to the original filename.
|
27 |
+
If no color is provided, black is used by default.
|
28 |
+
|
29 |
+
Args:
|
30 |
+
image_path: Path to the original image.
|
31 |
+
text: Text to write on the image.
|
32 |
+
color: Optional RGB color of the text. Defaults to black.
|
33 |
+
output_name: Optional output filename (without extension).
|
34 |
+
|
35 |
+
Returns:
|
36 |
+
Dictionary with success status and info.
|
37 |
+
"""
|
38 |
+
if image is None:
|
39 |
+
return None
|
40 |
+
|
41 |
+
img = image.copy()
|
42 |
+
draw = ImageDraw.Draw(img)
|
43 |
+
|
44 |
+
try:
|
45 |
+
font = ImageFont.truetype("arial.ttf", font_size)
|
46 |
+
except:
|
47 |
+
try:
|
48 |
+
font = ImageFont.truetype("/System/Library/Fonts/Arial.ttf", font_size)
|
49 |
+
except:
|
50 |
+
font = ImageFont.load_default()
|
51 |
+
|
52 |
+
parsed_color = parse_color(color)
|
53 |
+
|
54 |
+
if centered:
|
55 |
+
bbox = draw.textbbox((0, 0), text, font=font)
|
56 |
+
text_width = bbox[2] - bbox[0]
|
57 |
+
text_height = bbox[3] - bbox[1]
|
58 |
+
x = (img.width - text_width) // 2
|
59 |
+
y = (img.height - text_height) // 2
|
60 |
+
|
61 |
+
draw.text((x, y), text, fill=parsed_color, font=font)
|
62 |
+
return img
|
src/utils/apply_filter.py
ADDED
@@ -0,0 +1,129 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image, ImageFilter, ImageEnhance
|
2 |
+
from io import BytesIO
|
3 |
+
import requests
|
4 |
+
import base64
|
5 |
+
|
6 |
+
def apply_filter_direct(image, filter_type, intensity=1.0):
|
7 |
+
"""
|
8 |
+
Apply filters directly to PIL image without base64 conversion
|
9 |
+
"""
|
10 |
+
if image is None:
|
11 |
+
return None
|
12 |
+
|
13 |
+
try:
|
14 |
+
print(f"Applying filter: {filter_type} with intensity: {intensity}")
|
15 |
+
img = image.copy()
|
16 |
+
|
17 |
+
if img.mode != 'RGB':
|
18 |
+
img = img.convert('RGB')
|
19 |
+
|
20 |
+
if filter_type == 'blur':
|
21 |
+
img = img.filter(ImageFilter.GaussianBlur(radius=max(0.1, intensity)))
|
22 |
+
elif filter_type == 'sharpen':
|
23 |
+
if intensity <= 1.0:
|
24 |
+
img = img.filter(ImageFilter.SHARPEN)
|
25 |
+
else:
|
26 |
+
img = img.filter(ImageFilter.UnsharpMask(radius=2, percent=int(intensity * 150), threshold=3))
|
27 |
+
elif filter_type == 'emboss':
|
28 |
+
img = img.filter(ImageFilter.EMBOSS)
|
29 |
+
elif filter_type == 'edge':
|
30 |
+
img = img.filter(ImageFilter.FIND_EDGES)
|
31 |
+
elif filter_type == 'smooth':
|
32 |
+
img = img.filter(ImageFilter.SMOOTH_MORE)
|
33 |
+
elif filter_type == 'brightness':
|
34 |
+
enhancer = ImageEnhance.Brightness(img)
|
35 |
+
img = enhancer.enhance(max(0.1, intensity))
|
36 |
+
elif filter_type == 'contrast':
|
37 |
+
enhancer = ImageEnhance.Contrast(img)
|
38 |
+
img = enhancer.enhance(max(0.1, intensity))
|
39 |
+
elif filter_type == 'saturation':
|
40 |
+
enhancer = ImageEnhance.Color(img)
|
41 |
+
img = enhancer.enhance(max(0.1, intensity))
|
42 |
+
elif filter_type == 'sepia':
|
43 |
+
img = apply_sepia_filter_direct(img)
|
44 |
+
elif filter_type == 'grayscale' or filter_type == 'black_white':
|
45 |
+
img = img.convert('L').convert('RGB')
|
46 |
+
elif filter_type == 'vintage':
|
47 |
+
img = apply_vintage_effect_direct(img)
|
48 |
+
else:
|
49 |
+
print(f"Unknown filter type: {filter_type}")
|
50 |
+
return image
|
51 |
+
|
52 |
+
print(f"Filter applied successfully")
|
53 |
+
return img
|
54 |
+
|
55 |
+
except Exception as e:
|
56 |
+
print(f"Error applying filter: {e}")
|
57 |
+
import traceback
|
58 |
+
traceback.print_exc()
|
59 |
+
return image
|
60 |
+
|
61 |
+
def apply_sepia_filter_direct(img):
|
62 |
+
"""Apply sepia tone effect to an image."""
|
63 |
+
width, height = img.size
|
64 |
+
pixels = img.load()
|
65 |
+
|
66 |
+
for y in range(height):
|
67 |
+
for x in range(width):
|
68 |
+
r, g, b = pixels[x, y]
|
69 |
+
|
70 |
+
tr = int(0.393 * r + 0.769 * g + 0.189 * b)
|
71 |
+
tg = int(0.349 * r + 0.686 * g + 0.168 * b)
|
72 |
+
tb = int(0.272 * r + 0.534 * g + 0.131 * b)
|
73 |
+
|
74 |
+
tr = min(255, tr)
|
75 |
+
tg = min(255, tg)
|
76 |
+
tb = min(255, tb)
|
77 |
+
|
78 |
+
pixels[x, y] = (tr, tg, tb)
|
79 |
+
|
80 |
+
return img
|
81 |
+
|
82 |
+
def apply_vintage_effect_direct(img):
|
83 |
+
"""Apply a vintage effect combining multiple filters."""
|
84 |
+
contrast_enhancer = ImageEnhance.Contrast(img)
|
85 |
+
img = contrast_enhancer.enhance(0.8)
|
86 |
+
|
87 |
+
brightness_enhancer = ImageEnhance.Brightness(img)
|
88 |
+
img = brightness_enhancer.enhance(1.1)
|
89 |
+
|
90 |
+
img = apply_sepia_filter_direct(img)
|
91 |
+
|
92 |
+
img = img.filter(ImageFilter.GaussianBlur(radius=0.5))
|
93 |
+
|
94 |
+
return img
|
95 |
+
|
96 |
+
def apply_sepia_filter_direct(img):
|
97 |
+
"""Apply sepia tone effect to an image."""
|
98 |
+
width, height = img.size
|
99 |
+
pixels = img.load()
|
100 |
+
|
101 |
+
for y in range(height):
|
102 |
+
for x in range(width):
|
103 |
+
r, g, b = pixels[x, y]
|
104 |
+
|
105 |
+
tr = int(0.393 * r + 0.769 * g + 0.189 * b)
|
106 |
+
tg = int(0.349 * r + 0.686 * g + 0.168 * b)
|
107 |
+
tb = int(0.272 * r + 0.534 * g + 0.131 * b)
|
108 |
+
|
109 |
+
tr = min(255, tr)
|
110 |
+
tg = min(255, tg)
|
111 |
+
tb = min(255, tb)
|
112 |
+
|
113 |
+
pixels[x, y] = (tr, tg, tb)
|
114 |
+
|
115 |
+
return img
|
116 |
+
|
117 |
+
def apply_vintage_effect_direct(img):
|
118 |
+
"""Apply a vintage effect combining multiple filters."""
|
119 |
+
contrast_enhancer = ImageEnhance.Contrast(img)
|
120 |
+
img = contrast_enhancer.enhance(0.8)
|
121 |
+
|
122 |
+
brightness_enhancer = ImageEnhance.Brightness(img)
|
123 |
+
img = brightness_enhancer.enhance(1.1)
|
124 |
+
|
125 |
+
img = apply_sepia_filter_direct(img)
|
126 |
+
|
127 |
+
img = img.filter(ImageFilter.GaussianBlur(radius=0.5))
|
128 |
+
|
129 |
+
return img
|
src/utils/change_format.py
ADDED
@@ -0,0 +1,45 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image
|
2 |
+
from io import BytesIO
|
3 |
+
import requests
|
4 |
+
import base64
|
5 |
+
from typing import Union
|
6 |
+
|
7 |
+
def change_format(image: Union[str, BytesIO], target_format: str) -> str:
|
8 |
+
"""
|
9 |
+
Change the format of an image from a URL to the specified target format.
|
10 |
+
|
11 |
+
Args:
|
12 |
+
image_url: The URL of the input image.
|
13 |
+
target_format: The desired output format (e.g., 'JPEG', 'PNG').
|
14 |
+
|
15 |
+
Returns:
|
16 |
+
The image converted to the target format as a base64-encoded string.
|
17 |
+
"""
|
18 |
+
|
19 |
+
if not isinstance(image, BytesIO):
|
20 |
+
response = requests.get(image, timeout=30)
|
21 |
+
response.raise_for_status()
|
22 |
+
|
23 |
+
# Open the image from bytes
|
24 |
+
img = Image.open(BytesIO(response.content))
|
25 |
+
|
26 |
+
# Convert the image to the target format
|
27 |
+
output = BytesIO()
|
28 |
+
img.save(output, format=target_format)
|
29 |
+
output.seek(0)
|
30 |
+
|
31 |
+
# Convert to base64 string for JSON serialization
|
32 |
+
encoded_image = base64.b64encode(output.getvalue()).decode('utf-8')
|
33 |
+
|
34 |
+
return encoded_image # Return base64 encoded string that can be serialized to JSON
|
35 |
+
else:
|
36 |
+
img = Image.open(image)
|
37 |
+
|
38 |
+
output = BytesIO()
|
39 |
+
img.save(output, format=target_format)
|
40 |
+
output.seek(0)
|
41 |
+
|
42 |
+
# Convert to base64 string for JSON serialization
|
43 |
+
encoded_image = base64.b64encode(output.getvalue()).decode('utf-8')
|
44 |
+
|
45 |
+
return encoded_image
|
src/utils/compress.py
ADDED
@@ -0,0 +1,59 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image
|
2 |
+
import os
|
3 |
+
from typing import Literal, Optional
|
4 |
+
|
5 |
+
def compress_image_file(
|
6 |
+
input_path: str,
|
7 |
+
output_path: str,
|
8 |
+
quality: int = 85,
|
9 |
+
format: Literal["JPEG", "PNG", "WEBP"] = "JPEG",
|
10 |
+
max_width: Optional[int] = None,
|
11 |
+
max_height: Optional[int] = None
|
12 |
+
) -> str:
|
13 |
+
"""
|
14 |
+
Compress an image file from disk.
|
15 |
+
"""
|
16 |
+
try:
|
17 |
+
if not os.path.splitext(output_path)[1]:
|
18 |
+
extension_map = {"JPEG": ".jpg", "PNG": ".png", "WEBP": ".webp"}
|
19 |
+
output_path = output_path + extension_map[format]
|
20 |
+
|
21 |
+
with Image.open(input_path) as img:
|
22 |
+
if format == "JPEG" and img.mode in ("RGBA", "P"):
|
23 |
+
img = img.convert("RGB")
|
24 |
+
|
25 |
+
if max_width or max_height:
|
26 |
+
img.thumbnail((max_width or img.width, max_height or img.height), Image.Resampling.LANCZOS)
|
27 |
+
|
28 |
+
save_kwargs = {"format": format, "optimize": True}
|
29 |
+
if format in ["JPEG", "WEBP"]:
|
30 |
+
save_kwargs["quality"] = quality
|
31 |
+
|
32 |
+
img.save(output_path, **save_kwargs)
|
33 |
+
|
34 |
+
original_size = os.path.getsize(input_path) / 1024 / 1024
|
35 |
+
compressed_size = os.path.getsize(output_path) / 1024 / 1024
|
36 |
+
reduction = (1 - compressed_size/original_size) * 100
|
37 |
+
|
38 |
+
return f"✅ Compressed successfully!\nOriginal: {original_size:.2f}MB → Compressed: {compressed_size:.2f}MB\nReduction: {reduction:.1f}%"
|
39 |
+
|
40 |
+
except Exception as e:
|
41 |
+
return f"❌ Error: {str(e)}"
|
42 |
+
|
43 |
+
def compress_image_memory(image: Image.Image, quality: int = 80, format: str = "JPEG") -> Image.Image:
|
44 |
+
"""
|
45 |
+
Compress an image in memory and return the compressed image.
|
46 |
+
"""
|
47 |
+
if format == "JPEG" and image.mode in ("RGBA", "P"):
|
48 |
+
image = image.convert("RGB")
|
49 |
+
|
50 |
+
output = BytesIO()
|
51 |
+
save_kwargs = {"format": format, "optimize": True}
|
52 |
+
|
53 |
+
if format in ["JPEG", "WEBP"]:
|
54 |
+
save_kwargs["quality"] = quality
|
55 |
+
|
56 |
+
image.save(output, **save_kwargs)
|
57 |
+
output.seek(0)
|
58 |
+
|
59 |
+
return Image.open(output)
|
src/utils/describe.py
ADDED
@@ -0,0 +1,112 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
import base64
|
3 |
+
import requests
|
4 |
+
from pathlib import Path
|
5 |
+
from openai import OpenAI
|
6 |
+
from urllib.parse import urlparse
|
7 |
+
from dotenv import load_dotenv
|
8 |
+
|
9 |
+
|
10 |
+
def describe_image(image_path: str) -> str:
|
11 |
+
"""
|
12 |
+
Generate a description of the image at the given path or URL.
|
13 |
+
|
14 |
+
Args:
|
15 |
+
image_path: Path to local image file OR URL to image
|
16 |
+
|
17 |
+
Returns:
|
18 |
+
A string description of the image """
|
19 |
+
load_dotenv()
|
20 |
+
|
21 |
+
# Check if API key is available
|
22 |
+
api_key = os.getenv("NEBIUS_API_KEY")
|
23 |
+
if not api_key:
|
24 |
+
return "Error: NEBIUS_API_KEY environment variable not set"
|
25 |
+
|
26 |
+
try:
|
27 |
+
# Determine if it's a URL or local file path
|
28 |
+
parsed = urlparse(image_path)
|
29 |
+
is_url = bool(parsed.scheme and parsed.netloc)
|
30 |
+
|
31 |
+
if is_url:
|
32 |
+
# Handle URL
|
33 |
+
print(f"📡 Downloading image from URL: {image_path}")
|
34 |
+
response = requests.get(image_path, timeout=30)
|
35 |
+
response.raise_for_status()
|
36 |
+
image_data = response.content
|
37 |
+
|
38 |
+
# Determine content type from response headers
|
39 |
+
content_type = response.headers.get('content-type', '')
|
40 |
+
if 'image' not in content_type:
|
41 |
+
return f"Error: URL does not appear to contain an image. Content-Type: {content_type}"
|
42 |
+
|
43 |
+
else:
|
44 |
+
# Handle local file
|
45 |
+
image_path = Path(image_path)
|
46 |
+
|
47 |
+
if not image_path.exists():
|
48 |
+
return f"Error: Local file not found: {image_path}"
|
49 |
+
|
50 |
+
# Check if it's an image file
|
51 |
+
valid_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp'}
|
52 |
+
if image_path.suffix.lower() not in valid_extensions:
|
53 |
+
return f"Error: Unsupported file type '{image_path.suffix}'. Supported: {valid_extensions}"
|
54 |
+
|
55 |
+
print(f"📁 Reading local image: {image_path}")
|
56 |
+
with open(image_path, "rb") as f:
|
57 |
+
image_data = f.read()
|
58 |
+
|
59 |
+
# Encode image to base64
|
60 |
+
base64_image = base64.b64encode(image_data).decode('utf-8')
|
61 |
+
|
62 |
+
# Create OpenAI client
|
63 |
+
client = OpenAI(
|
64 |
+
base_url="https://api.studio.nebius.com/v1/",
|
65 |
+
api_key=api_key
|
66 |
+
)
|
67 |
+
|
68 |
+
# Make API call with proper vision format
|
69 |
+
response = client.chat.completions.create(
|
70 |
+
model="mistralai/Mistral-Small-3.1-24B-Instruct-2503",
|
71 |
+
messages=[
|
72 |
+
{
|
73 |
+
"role": "system",
|
74 |
+
"content": "You are a helpful assistant that provides detailed descriptions of images. Focus on the main subjects, colors, composition, and any notable details."
|
75 |
+
},
|
76 |
+
{
|
77 |
+
"role": "user",
|
78 |
+
"content": [
|
79 |
+
{
|
80 |
+
"type": "text",
|
81 |
+
"text": "Please provide a detailed description of this image."
|
82 |
+
},
|
83 |
+
{
|
84 |
+
"type": "image_url",
|
85 |
+
"image_url": {
|
86 |
+
"url": f"data:image/jpeg;base64,{base64_image}"
|
87 |
+
}
|
88 |
+
}
|
89 |
+
]
|
90 |
+
}
|
91 |
+
],
|
92 |
+
max_tokens=500
|
93 |
+
)
|
94 |
+
|
95 |
+
description = response.choices[0].message.content.strip()
|
96 |
+
return description
|
97 |
+
|
98 |
+
except requests.RequestException as e:
|
99 |
+
return f"Error downloading image from URL: {str(e)}"
|
100 |
+
except FileNotFoundError:
|
101 |
+
return f"Error: File not found: {image_path}"
|
102 |
+
except Exception as e:
|
103 |
+
error_msg = str(e)
|
104 |
+
|
105 |
+
if "vision" in error_msg.lower() or "image" in error_msg.lower():
|
106 |
+
return f"Error: This model may not support vision capabilities. Try a vision-enabled model. Details: {error_msg}"
|
107 |
+
elif "401" in error_msg or "unauthorized" in error_msg.lower():
|
108 |
+
return "Error: Invalid API key or insufficient permissions"
|
109 |
+
elif "rate" in error_msg.lower() or "quota" in error_msg.lower():
|
110 |
+
return f"Error: API rate limit or quota exceeded: {error_msg}"
|
111 |
+
else:
|
112 |
+
return f"Error processing image: {error_msg}"
|
src/utils/generate_image.py
ADDED
@@ -0,0 +1,79 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
import base64
|
3 |
+
from typing import Dict, Any
|
4 |
+
from openai import OpenAI
|
5 |
+
|
6 |
+
def generate_image(
|
7 |
+
prompt: str,
|
8 |
+
width: int = 1024,
|
9 |
+
height: int = 1024,
|
10 |
+
num_inference_steps: int = 28,
|
11 |
+
negative_prompt: str = "",
|
12 |
+
seed: int = -1
|
13 |
+
) -> Dict[str, Any]:
|
14 |
+
"""
|
15 |
+
Generate an image using Nebius API.
|
16 |
+
|
17 |
+
Args:
|
18 |
+
prompt: Text prompt for image generation
|
19 |
+
output_path: Path where to save the generated image
|
20 |
+
width: Image width
|
21 |
+
height: Image height
|
22 |
+
num_inference_steps: Number of inference steps
|
23 |
+
negative_prompt: Negative prompt for generation
|
24 |
+
seed: Random seed (-1 for random)
|
25 |
+
|
26 |
+
Returns:
|
27 |
+
Dictionary with result information
|
28 |
+
"""
|
29 |
+
|
30 |
+
try:
|
31 |
+
client = OpenAI(
|
32 |
+
base_url="https://api.studio.nebius.com/v1/",
|
33 |
+
api_key=os.environ.get("NEBIUS_API_KEY")
|
34 |
+
)
|
35 |
+
|
36 |
+
response = client.images.generate(
|
37 |
+
model="black-forest-labs/flux-dev",
|
38 |
+
response_format="b64_json",
|
39 |
+
extra_body={
|
40 |
+
"response_extension": "png",
|
41 |
+
"width": width,
|
42 |
+
"height": height,
|
43 |
+
"num_inference_steps": num_inference_steps,
|
44 |
+
"negative_prompt": negative_prompt,
|
45 |
+
"seed": seed
|
46 |
+
},
|
47 |
+
prompt=prompt
|
48 |
+
)
|
49 |
+
|
50 |
+
image_data = base64.b64decode(response.data[0].b64_json)
|
51 |
+
|
52 |
+
|
53 |
+
return {
|
54 |
+
"success": True,
|
55 |
+
"message": "Image generated successfully",
|
56 |
+
"prompt": prompt,
|
57 |
+
"b64": image_data,
|
58 |
+
"generation_params": {
|
59 |
+
"width": width,
|
60 |
+
"height": height,
|
61 |
+
"num_inference_steps": num_inference_steps,
|
62 |
+
"negative_prompt": negative_prompt,
|
63 |
+
"seed": seed
|
64 |
+
}
|
65 |
+
}
|
66 |
+
|
67 |
+
except Exception as e:
|
68 |
+
if "NEBIUS_API_KEY" in str(e) or not os.environ.get("NEBIUS_API_KEY"):
|
69 |
+
return {
|
70 |
+
"success": False,
|
71 |
+
"error": "NEBIUS_API_KEY environment variable not set",
|
72 |
+
"output_path": None,
|
73 |
+
'user': os.environ.get("USER")
|
74 |
+
}
|
75 |
+
return {
|
76 |
+
"success": False,
|
77 |
+
"error": f"Failed to generate image: {str(e)}",
|
78 |
+
"output_path": None
|
79 |
+
}
|
src/utils/remove_background.py
ADDED
@@ -0,0 +1,100 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import requests
|
2 |
+
from typing import Optional, Dict, Any, Union
|
3 |
+
import os
|
4 |
+
import rembg
|
5 |
+
import numpy as np
|
6 |
+
from PIL import Image
|
7 |
+
import io
|
8 |
+
import base64
|
9 |
+
import re
|
10 |
+
|
11 |
+
def remove_background(
|
12 |
+
image_input: Union[str, bytes, np.ndarray, Image.Image],
|
13 |
+
model_name: str = "u2net"
|
14 |
+
) -> Dict[str, Any]:
|
15 |
+
"""
|
16 |
+
Remove background from an image.
|
17 |
+
|
18 |
+
Args:
|
19 |
+
image_input: Can be one of:
|
20 |
+
- URL string
|
21 |
+
- Data URL string (base64 encoded)
|
22 |
+
- Image bytes
|
23 |
+
- NumPy array
|
24 |
+
- PIL Image
|
25 |
+
model_name: Background removal model to use
|
26 |
+
|
27 |
+
Returns:
|
28 |
+
Dictionary with result information and processed image data
|
29 |
+
"""
|
30 |
+
|
31 |
+
try:
|
32 |
+
# Initialize session
|
33 |
+
session = rembg.new_session(model_name=model_name)
|
34 |
+
|
35 |
+
# Handle different input types
|
36 |
+
if isinstance(image_input, str):
|
37 |
+
if image_input.startswith('http://') or image_input.startswith('https://'):
|
38 |
+
# If input is a URL, download the image
|
39 |
+
response = requests.get(image_input, timeout=30)
|
40 |
+
response.raise_for_status()
|
41 |
+
input_data = response.content
|
42 |
+
source_info = f"URL: {image_input}"
|
43 |
+
elif image_input.startswith('data:'):
|
44 |
+
# If input is a data URL (base64 encoded string)
|
45 |
+
# Extract the base64 part after the comma
|
46 |
+
base64_data = re.sub('^data:image/.+;base64,', '', image_input)
|
47 |
+
input_data = base64.b64decode(base64_data)
|
48 |
+
source_info = "data URL"
|
49 |
+
else:
|
50 |
+
return {
|
51 |
+
"success": False,
|
52 |
+
"error": f"Unsupported string input format: {image_input[:30]}...",
|
53 |
+
"image_data": None
|
54 |
+
}
|
55 |
+
elif isinstance(image_input, bytes):
|
56 |
+
# If input is bytes, use directly
|
57 |
+
input_data = image_input
|
58 |
+
source_info = "image bytes"
|
59 |
+
elif isinstance(image_input, np.ndarray):
|
60 |
+
# If input is numpy array, convert to bytes
|
61 |
+
pil_img = Image.fromarray(image_input)
|
62 |
+
buffer = io.BytesIO()
|
63 |
+
pil_img.save(buffer, format="PNG")
|
64 |
+
input_data = buffer.getvalue()
|
65 |
+
source_info = "numpy array"
|
66 |
+
elif isinstance(image_input, Image.Image):
|
67 |
+
# If input is PIL Image, convert to bytes
|
68 |
+
buffer = io.BytesIO()
|
69 |
+
image_input.save(buffer, format="PNG")
|
70 |
+
input_data = buffer.getvalue()
|
71 |
+
source_info = "PIL Image"
|
72 |
+
else:
|
73 |
+
return {
|
74 |
+
"success": False,
|
75 |
+
"error": f"Unsupported input type: {type(image_input)}",
|
76 |
+
"image_data": None
|
77 |
+
}
|
78 |
+
|
79 |
+
# Remove background
|
80 |
+
output_data = rembg.remove(input_data, session=session)
|
81 |
+
|
82 |
+
return {
|
83 |
+
"success": True,
|
84 |
+
"message": f"Background removed from {source_info} using {model_name} model",
|
85 |
+
"image_data": output_data,
|
86 |
+
"model_used": model_name
|
87 |
+
}
|
88 |
+
|
89 |
+
except requests.RequestException as e:
|
90 |
+
return {
|
91 |
+
"success": False,
|
92 |
+
"error": f"Failed to download image: {str(e)}",
|
93 |
+
"image_data": None
|
94 |
+
}
|
95 |
+
except Exception as e:
|
96 |
+
return {
|
97 |
+
"success": False,
|
98 |
+
"error": f"Failed to process image: {str(e)}",
|
99 |
+
"image_data": None
|
100 |
+
}
|
src/utils/resize_image.py
ADDED
@@ -0,0 +1,56 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image
|
2 |
+
from io import BytesIO
|
3 |
+
import requests
|
4 |
+
import base64
|
5 |
+
from typing import Union, Tuple
|
6 |
+
|
7 |
+
def resize_image(image_input: Union[str, BytesIO], target_size: Tuple[int, int], return_format: str = "base64") -> str:
|
8 |
+
"""
|
9 |
+
Resize an image to the target size while maintaining aspect ratio.
|
10 |
+
|
11 |
+
Args:
|
12 |
+
image_input: URL, file path, base64 string, or BytesIO object
|
13 |
+
target_size: Tuple (width, height) for the target size
|
14 |
+
return_format: Format to return the image in ("base64" or "pil")
|
15 |
+
|
16 |
+
Returns:
|
17 |
+
Base64 encoded string of the resized image or PIL Image object
|
18 |
+
"""
|
19 |
+
# Convert input to PIL Image
|
20 |
+
if isinstance(image_input, str):
|
21 |
+
if image_input.startswith(('http://', 'https://')):
|
22 |
+
# It's a URL
|
23 |
+
response = requests.get(image_input, timeout=10)
|
24 |
+
response.raise_for_status()
|
25 |
+
image = Image.open(BytesIO(response.content))
|
26 |
+
elif image_input.startswith('data:image'):
|
27 |
+
# It's a base64 data URI
|
28 |
+
base64_data = image_input.split(',')[1]
|
29 |
+
image = Image.open(BytesIO(base64.b64decode(base64_data)))
|
30 |
+
elif ';base64,' not in image_input and len(image_input) > 500:
|
31 |
+
# Likely a raw base64 string
|
32 |
+
image = Image.open(BytesIO(base64.b64decode(image_input)))
|
33 |
+
else:
|
34 |
+
# Assume it's a file path
|
35 |
+
image = Image.open(image_input)
|
36 |
+
elif isinstance(image_input, BytesIO):
|
37 |
+
image = Image.open(image_input)
|
38 |
+
else:
|
39 |
+
raise ValueError("Unsupported image input type")
|
40 |
+
|
41 |
+
# Calculate the aspect ratio
|
42 |
+
aspect_ratio = min(target_size[0] / image.width, target_size[1] / image.height)
|
43 |
+
|
44 |
+
# Calculate new size
|
45 |
+
new_size = (int(image.width * aspect_ratio), int(image.height * aspect_ratio))
|
46 |
+
|
47 |
+
# Resize the image using the proper resampling filter
|
48 |
+
resized_image = image.resize(new_size, Image.LANCZOS)
|
49 |
+
|
50 |
+
# Return in requested format
|
51 |
+
if return_format.lower() == "base64":
|
52 |
+
buffer = BytesIO()
|
53 |
+
resized_image.save(buffer, format="PNG")
|
54 |
+
return base64.b64encode(buffer.getvalue()).decode('utf-8')
|
55 |
+
else:
|
56 |
+
return resized_image
|
src/utils/visualize_image.py
ADDED
@@ -0,0 +1,18 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import base64
|
2 |
+
from PIL import Image
|
3 |
+
from io import BytesIO
|
4 |
+
|
5 |
+
def visualize_base64_image(base64_string:str):
|
6 |
+
"""
|
7 |
+
Visualize a base64-encoded image string.
|
8 |
+
|
9 |
+
Args:
|
10 |
+
base64_string: The base64-encoded image string.
|
11 |
+
"""
|
12 |
+
# Decode the base64 string back to binary
|
13 |
+
image_data = base64.b64decode(base64_string)
|
14 |
+
|
15 |
+
# Create an image from the binary data
|
16 |
+
img = Image.open(BytesIO(image_data))
|
17 |
+
|
18 |
+
img.show()
|
src/utils/watermark.py
ADDED
@@ -0,0 +1,94 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image, ImageDraw, ImageFont
|
2 |
+
import os
|
3 |
+
from typing import Dict, Any
|
4 |
+
import cv2
|
5 |
+
import numpy as np
|
6 |
+
|
7 |
+
def add_watermark(image: Image.Image, watermark_text: str, opacity: float = 0.5) -> Image.Image:
|
8 |
+
"""
|
9 |
+
Add a semi-transparent text watermark directly to a PIL Image.
|
10 |
+
|
11 |
+
Args:
|
12 |
+
image: PIL Image object to watermark
|
13 |
+
watermark_text: Text to use as watermark
|
14 |
+
opacity: Opacity of the watermark (0.1-1.0)
|
15 |
+
|
16 |
+
Returns:
|
17 |
+
PIL Image with watermark added
|
18 |
+
"""
|
19 |
+
from PIL import ImageDraw, ImageFont
|
20 |
+
|
21 |
+
overlay = Image.new('RGBA', image.size, (255, 255, 255, 0))
|
22 |
+
draw = ImageDraw.Draw(overlay)
|
23 |
+
|
24 |
+
try:
|
25 |
+
font_size = min(image.width, image.height) // 20
|
26 |
+
font = ImageFont.truetype("arial.ttf", font_size)
|
27 |
+
except:
|
28 |
+
font = ImageFont.load_default()
|
29 |
+
|
30 |
+
bbox = draw.textbbox((0, 0), watermark_text, font=font)
|
31 |
+
text_width = bbox[2] - bbox[0]
|
32 |
+
text_height = bbox[3] - bbox[1]
|
33 |
+
|
34 |
+
x = (image.width - text_width) // 2
|
35 |
+
y = (image.height - text_height) // 2
|
36 |
+
|
37 |
+
alpha_value = int(255 * opacity)
|
38 |
+
text_color = (255, 255, 255, alpha_value)
|
39 |
+
shadow_color = (0, 0, 0, int(alpha_value * 0.5))
|
40 |
+
|
41 |
+
draw.text((x-2, y-2), watermark_text, fill=shadow_color, font=font)
|
42 |
+
draw.text((x, y), watermark_text, fill=text_color, font=font)
|
43 |
+
|
44 |
+
watermarked = Image.alpha_composite(image.convert('RGBA'), overlay)
|
45 |
+
return watermarked.convert('RGB')
|
46 |
+
|
47 |
+
def remove_watermark(image_path: str, alpha: float = 2.0, beta: float = -160) -> Dict[str, Any]:
|
48 |
+
"""
|
49 |
+
Attempt to remove watermarks from an image using contrast and brightness adjustment.
|
50 |
+
|
51 |
+
Args:
|
52 |
+
image_path: The path to the input image file.
|
53 |
+
alpha: Contrast control (1.0-3.0, default 2.0). Higher values increase contrast.
|
54 |
+
beta: Brightness control (-255 to 255, default -160). Negative values decrease brightness.
|
55 |
+
|
56 |
+
Returns:
|
57 |
+
A dictionary containing success status, file paths, and operation details.
|
58 |
+
On success: success=True, input_path, output_path, output_size_bytes, alpha, beta, message.
|
59 |
+
On failure: success=False, error message, input_path, output_path=None.
|
60 |
+
"""
|
61 |
+
try:
|
62 |
+
img = cv2.imread(image_path)
|
63 |
+
|
64 |
+
if img is None:
|
65 |
+
raise ValueError("Could not load image")
|
66 |
+
|
67 |
+
new = alpha * img + beta
|
68 |
+
new = np.clip(new, 0, 255).astype(np.uint8)
|
69 |
+
|
70 |
+
base_dir = os.path.dirname(image_path)
|
71 |
+
base_name, ext = os.path.splitext(os.path.basename(image_path))
|
72 |
+
new_filename = f"{base_name}_cleaned{ext}"
|
73 |
+
new_path = os.path.join(base_dir, new_filename)
|
74 |
+
|
75 |
+
cv2.imwrite(new_path, new)
|
76 |
+
output_size = os.path.getsize(new_path)
|
77 |
+
|
78 |
+
return {
|
79 |
+
"success": True,
|
80 |
+
"message": "Watermark removal attempted successfully",
|
81 |
+
"input_path": image_path,
|
82 |
+
"output_path": new_path,
|
83 |
+
"output_size_bytes": output_size,
|
84 |
+
"alpha": alpha,
|
85 |
+
"beta": beta
|
86 |
+
}
|
87 |
+
|
88 |
+
except Exception as e:
|
89 |
+
return {
|
90 |
+
"success": False,
|
91 |
+
"error": str(e),
|
92 |
+
"input_path": image_path,
|
93 |
+
"output_path": None
|
94 |
+
}
|