File size: 6,375 Bytes
b49cc80
 
 
 
 
 
 
 
9ceb693
 
 
 
 
 
 
 
 
 
 
 
 
b49cc80
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import os
import torch
import cv2
import numpy as np
from PIL import Image
import gradio as gr
import requests

import os
import sys
import subprocess

# Clone the repo manually at runtime
if not os.path.exists("Grounded-Segment-Anything"):
    subprocess.run(["git", "clone", "--recurse-submodules", "https://github.com/IDEA-Research/Grounded-Segment-Anything.git"])

# Add submodules to path
sys.path.append("Grounded-Segment-Anything/GroundingDINO")
sys.path.append("Grounded-Segment-Anything/segment_anything")


# ---------------------------
# Download helper
# ---------------------------
def download_if_missing(url, dest_path):
    os.makedirs(os.path.dirname(dest_path), exist_ok=True)
    if not os.path.exists(dest_path):
        print(f"Downloading {os.path.basename(dest_path)}...")
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(dest_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"Saved to {dest_path}")
    else:
        print(f"{os.path.basename(dest_path)} already exists. Skipping.")

# ---------------------------
# Download models
# ---------------------------
download_if_missing(
    "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth",
    "checkpoints/sam_vit_h_4b8939.pth"
)

download_if_missing(
    "https://github.com/IDEA-Research/GroundingDINO/releases/download/v0.1.0-alpha2/groundingdino_swinb_cogcoor.pth",
    "checkpoints/groundingdino_swinb_cogcoor.pth"
)

download_if_missing(
    "https://raw.githubusercontent.com/IDEA-Research/GroundingDINO/main/groundingdino/config/GroundingDINO_SwinB_cfg.py",
    "checkpoints/GroundingDINO_SwinB_cfg.py"
)

# ---------------------------
# Device setup
# ---------------------------
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# ---------------------------
# Load models
# ---------------------------
from segment_anything import build_sam, SamPredictor
from diffusers import StableDiffusionInpaintPipeline
from groundingdino.util.inference import Model
import supervision as sv

# SAM
sam = build_sam(checkpoint="checkpoints/sam_vit_h_4b8939.pth")
sam.to(device=DEVICE)
sam_predictor = SamPredictor(sam)

# Grounding DINO
dino_model = Model(
    model_config_path="checkpoints/GroundingDINO_SwinB_cfg.py",
    model_checkpoint_path="checkpoints/groundingdino_swinb_cogcoor.pth",
    device=DEVICE
)

# Stable Diffusion Inpainting
dtype = torch.float16 if DEVICE.type != "cpu" else torch.float32
pipe = StableDiffusionInpaintPipeline.from_pretrained(
    "stabilityai/stable-diffusion-2-inpainting",
    torch_dtype=dtype
)
if DEVICE.type != "cpu":
    pipe = pipe.to(DEVICE)

# ---------------------------
# Inference Functions
# ---------------------------
def detection_fn(image, prompt):
    image_np = np.array(image)
    image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
    detections, _ = dino_model.predict_with_caption(
        image=image_cv, caption=prompt, box_threshold=0.35, text_threshold=0.25
    )
    detections.class_id = np.zeros(len(detections), dtype=int)
    box_annotator = sv.BoxAnnotator()
    annotated = box_annotator.annotate(scene=image_cv, detections=detections)
    return cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)

def segmentation_fn(image, prompt):
    image_np = np.array(image)
    image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
    detections, _ = dino_model.predict_with_caption(
        image=image_cv, caption=prompt, box_threshold=0.35, text_threshold=0.25
    )
    boxes = detections.xyxy
    sam_predictor.set_image(image_np)
    masks, scores, _ = sam_predictor.predict(box=boxes, multimask_output=True)
    if masks is None or len(masks) == 0:
        raise ValueError("No masks found")
    mask = masks[np.argmax(scores)]

    # Visualize mask
    def overlay_mask(mask, image):
        color = np.concatenate([np.random.random(3), np.array([0.8])])
        h, w = mask.shape[-2:]
        mask_img = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
        image_pil = Image.fromarray(image).convert("RGBA")
        mask_pil = Image.fromarray((mask_img * 255).astype(np.uint8)).convert("RGBA")
        return np.array(Image.alpha_composite(image_pil, mask_pil))

    return overlay_mask(mask, image_np)

def inpainting_fn(image, prompt):
    image_np = np.array(image)
    image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
    detections, _ = dino_model.predict_with_caption(
        image=image_cv, caption=prompt, box_threshold=0.35, text_threshold=0.25
    )
    boxes = detections.xyxy
    sam_predictor.set_image(image_np)
    masks, scores, _ = sam_predictor.predict(box=boxes, multimask_output=True)
    if masks is None or len(masks) == 0:
        raise ValueError("No masks found")
    mask = masks[np.argmax(scores)]

    image_pil = image.convert("RGB")
    mask_img = Image.fromarray((mask.astype(np.uint8) * 255)).convert("L")
    image_resized = image_pil.resize((512, 512))
    mask_resized = mask_img.resize((512, 512))
    inpainted = pipe(prompt=prompt, image=image_resized, mask_image=mask_resized).images[0]
    return inpainted.resize(image_pil.size)

# ---------------------------
# Gradio Interface
# ---------------------------
with gr.Blocks() as demo:
    gr.Markdown("# Grounded Segment Anything + SAM + Stable Diffusion")
    with gr.Tabs():
        with gr.TabItem("Detection"):
            img = gr.Image(type="pil")
            txt = gr.Textbox(label="Prompt", value="bench")
            out = gr.Image()
            btn = gr.Button("Run Detection")
            btn.click(detection_fn, inputs=[img, txt], outputs=out)

        with gr.TabItem("Segmentation"):
            img2 = gr.Image(type="pil")
            txt2 = gr.Textbox(label="Prompt", value="bench")
            out2 = gr.Image()
            btn2 = gr.Button("Run Segmentation")
            btn2.click(segmentation_fn, inputs=[img2, txt2], outputs=out2)

        with gr.TabItem("Inpainting"):
            img3 = gr.Image(type="pil")
            txt3 = gr.Textbox(label="Prompt", value="A sofa, cyberpunk style, colorful")
            out3 = gr.Image()
            btn3 = gr.Button("Run Inpainting")
            btn3.click(inpainting_fn, inputs=[img3, txt3], outputs=out3)

if __name__ == "__main__":
    demo.launch()