import os import tempfile import torch import numpy as np import gradio as gr from PIL import Image import cv2 from diffusers import DiffusionPipeline import cupy as cp from cupyx.scipy.ndimage import label as cp_label from cupyx.scipy.ndimage import binary_dilation from sklearn.cluster import DBSCAN import trimesh class GPUSatelliteModelGenerator: def __init__(self, building_height=0.05): self.building_height = building_height # Move color arrays to GPU using cupy self.shadow_colors = cp.array([ [31, 42, 76], [58, 64, 92], [15, 27, 56], [21, 22, 50], [76, 81, 99] ]) self.road_colors = cp.array([ [187, 182, 175], [138, 138, 138], [142, 142, 129], [202, 199, 189] ]) self.water_colors = cp.array([ [167, 225, 217], [67, 101, 97], [53, 83, 84], [47, 94, 100], [73, 131, 135] ]) # Convert reference colors to HSV on GPU self.shadow_colors_hsv = cp.asarray(cv2.cvtColor( self.shadow_colors.get().reshape(-1, 1, 3).astype(np.uint8), cv2.COLOR_RGB2HSV ).reshape(-1, 3)) self.road_colors_hsv = cp.asarray(cv2.cvtColor( self.road_colors.get().reshape(-1, 1, 3).astype(np.uint8), cv2.COLOR_RGB2HSV ).reshape(-1, 3)) self.water_colors_hsv = cp.asarray(cv2.cvtColor( self.water_colors.get().reshape(-1, 1, 3).astype(np.uint8), cv2.COLOR_RGB2HSV ).reshape(-1, 3)) # Normalize HSV values on GPU for colors_hsv in [self.shadow_colors_hsv, self.road_colors_hsv, self.water_colors_hsv]: colors_hsv[:, 0] = colors_hsv[:, 0] * 2 colors_hsv[:, 1:] = colors_hsv[:, 1:] / 255 # Color tolerances self.shadow_tolerance = {'hue': 15, 'sat': 0.15, 'val': 0.12} self.road_tolerance = {'hue': 10, 'sat': 0.12, 'val': 0.15} self.water_tolerance = {'hue': 20, 'sat': 0.15, 'val': 0.20} # Output colors (BGR for OpenCV) self.colors = { 'black': cp.array([0, 0, 0]), # Shadows 'blue': cp.array([255, 0, 0]), # Water 'green': cp.array([0, 255, 0]), # Vegetation 'gray': cp.array([128, 128, 128]), # Roads 'brown': cp.array([0, 140, 255]), # Terrain 'white': cp.array([255, 255, 255]) # Buildings } self.min_area_for_clustering = 1000 self.residential_height_factor = 0.6 self.isolation_threshold = 0.6 @staticmethod def gpu_color_distance_hsv(pixel_hsv, reference_hsv, tolerance): """GPU-accelerated HSV color distance calculation""" pixel_h = pixel_hsv[0] * 2 pixel_s = pixel_hsv[1] / 255 pixel_v = pixel_hsv[2] / 255 hue_diff = cp.minimum(cp.abs(pixel_h - reference_hsv[0]), 360 - cp.abs(pixel_h - reference_hsv[0])) sat_diff = cp.abs(pixel_s - reference_hsv[1]) val_diff = cp.abs(pixel_v - reference_hsv[2]) return cp.logical_and( cp.logical_and(hue_diff <= tolerance['hue'], sat_diff <= tolerance['sat']), val_diff <= tolerance['val'] ) def segment_image_gpu(self, img): """GPU-accelerated image segmentation""" # Transfer image to GPU gpu_img = cp.asarray(img) gpu_hsv = cp.asarray(cv2.cvtColor(img, cv2.COLOR_BGR2HSV)) height, width = img.shape[:2] output = cp.zeros_like(gpu_img) # Vectorized color matching on GPU hsv_pixels = gpu_hsv.reshape(-1, 3) # Create masks for each category shadow_mask = cp.zeros((height * width,), dtype=bool) road_mask = cp.zeros((height * width,), dtype=bool) water_mask = cp.zeros((height * width,), dtype=bool) # Vectorized color matching for ref_hsv in self.shadow_colors_hsv: shadow_mask |= self.gpu_color_distance_hsv(hsv_pixels.T, ref_hsv, self.shadow_tolerance) for ref_hsv in self.road_colors_hsv: road_mask |= self.gpu_color_distance_hsv(hsv_pixels.T, ref_hsv, self.road_tolerance) for ref_hsv in self.water_colors_hsv: water_mask |= self.gpu_color_distance_hsv(hsv_pixels.T, ref_hsv, self.water_tolerance) # Apply masks output_flat = output.reshape(-1, 3) output_flat[shadow_mask] = self.colors['black'] output_flat[water_mask] = self.colors['blue'] output_flat[road_mask] = self.colors['gray'] # Vegetation and building detection h, s, v = hsv_pixels.T h = h * 2 # Convert to 0-360 range s = s / 255 v = v / 255 vegetation_mask = (h >= 40) & (h <= 150) & (s >= 0.15) building_mask = ~(shadow_mask | water_mask | road_mask | vegetation_mask) output_flat[vegetation_mask] = self.colors['green'] output_flat[building_mask] = self.colors['white'] return output.reshape(height, width, 3) def estimate_heights_gpu(self, img, segmented): """GPU-accelerated height estimation""" gpu_segmented = cp.asarray(segmented) buildings_mask = cp.all(gpu_segmented == self.colors['white'], axis=2) shadows_mask = cp.all(gpu_segmented == self.colors['black'], axis=2) # Connected components labeling on GPU labeled_array, num_features = cp_label(buildings_mask) # Calculate areas using GPU areas = cp.bincount(labeled_array.ravel())[1:] # Skip background max_area = cp.max(areas) if len(areas) > 0 else 1 height_map = cp.zeros_like(labeled_array, dtype=cp.float32) # Process each building for label in range(1, num_features + 1): building_mask = (labeled_array == label) if not cp.any(building_mask): continue area = areas[label-1] size_factor = 0.3 + 0.7 * (area / max_area) # Calculate shadow influence dilated = binary_dilation(building_mask, structure=cp.ones((5,5))) shadow_ratio = cp.sum(dilated & shadows_mask) / cp.sum(dilated) shadow_factor = 0.2 + 0.8 * shadow_ratio # Height calculation based on size and shadows final_height = size_factor * shadow_factor height_map[building_mask] = final_height return height_map.get() * 0.25 def generate_mesh_gpu(self, height_map, texture_img): """Generate 3D mesh using GPU-accelerated calculations""" height_map_gpu = cp.asarray(height_map) height, width = height_map.shape # Generate vertex positions on GPU x, z = cp.meshgrid(cp.arange(width), cp.arange(height)) vertices = cp.stack([x, height_map_gpu * self.building_height, z], axis=-1) vertices = vertices.reshape(-1, 3) # Normalize coordinates scale = max(width, height) vertices[:, 0] = vertices[:, 0] / scale * 2 - (width / scale) vertices[:, 2] = vertices[:, 2] / scale * 2 - (height / scale) vertices[:, 1] = vertices[:, 1] * 2 - 1 # Generate faces i, j = cp.meshgrid(cp.arange(height-1), cp.arange(width-1), indexing='ij') v0 = (i * width + j).flatten() v1 = v0 + 1 v2 = ((i + 1) * width + j).flatten() v3 = v2 + 1 faces = cp.vstack(( cp.column_stack((v0, v2, v1)), cp.column_stack((v1, v2, v3)) )) # Generate UV coordinates uvs = cp.zeros((vertices.shape[0], 2)) uvs[:, 0] = x.flatten() / (width - 1) uvs[:, 1] = 1 - (z.flatten() / (height - 1)) # Convert to CPU for mesh creation vertices_cpu = vertices.get() faces_cpu = faces.get() uvs_cpu = uvs.get() # Create mesh if len(texture_img.shape) == 3 and texture_img.shape[2] == 4: texture_img = cv2.cvtColor(texture_img, cv2.COLOR_BGRA2RGB) elif len(texture_img.shape) == 3: texture_img = cv2.cvtColor(texture_img, cv2.COLOR_BGR2RGB) mesh = trimesh.Trimesh( vertices=vertices_cpu, faces=faces_cpu, visual=trimesh.visual.TextureVisuals( uv=uvs_cpu, image=Image.fromarray(texture_img) ) ) return mesh def generate_and_process_map(prompt: str) -> tuple[str | None, np.ndarray | None]: """Generate satellite image from prompt and convert to 3D model using GPU acceleration""" try: # Set dimensions and device width = height = 1024 # Generate random seed seed = np.random.randint(0, np.iinfo(np.int32).max) # Set random seeds torch.manual_seed(seed) np.random.seed(seed) # Generate satellite image using FLUX generator = torch.Generator(device=device).manual_seed(seed) generated_image = flux_pipe( prompt=f"satellite view in the style of TOK, {prompt}", width=width, height=height, num_inference_steps=25, generator=generator, guidance_scale=7.5 ).images[0] # Convert PIL Image to OpenCV format cv_image = cv2.cvtColor(np.array(generated_image), cv2.COLOR_RGB2BGR) # Initialize GPU-accelerated generator generator = GPUSatelliteModelGenerator(building_height=0.09) # Process image using GPU print("Segmenting image using GPU...") segmented_img = generator.segment_image_gpu(cv_image) print("Estimating heights using GPU...") height_map = generator.estimate_heights_gpu(cv_image, segmented_img) # Generate mesh using GPU-accelerated calculations print("Generating mesh using GPU...") mesh = generator.generate_mesh_gpu(height_map, cv_image) # Export to GLB temp_dir = tempfile.mkdtemp() output_path = os.path.join(temp_dir, 'output.glb') mesh.export(output_path) # Save segmented image to a temporary file segmented_path = os.path.join(temp_dir, 'segmented.png') cv2.imwrite(segmented_path, segmented_img.get()) return output_path, segmented_path except Exception as e: print(f"Error during generation: {str(e)}") import traceback traceback.print_exc() return None, None # Create Gradio interface with gr.Blocks() as demo: gr.Markdown("# GPU-Accelerated Text to Map") gr.Markdown("Generate 3D maps and segmentation maps from text descriptions using FLUX and GPU-accelerated processing.") with gr.Row(): prompt_input = gr.Text( label="Enter your prompt", placeholder="classic american town" ) with gr.Row(): generate_btn = gr.Button("Generate", variant="primary") with gr.Row(): with gr.Column(): model_output = gr.Model3D( label="Generated 3D Map", clear_color=[0.0, 0.0, 0.0, 0.0], ) with gr.Column(): segmented_output = gr.Image( label="Segmented Map", type="filepath" ) # Event handler generate_btn.click( fn=generate_and_process_map, inputs=[prompt_input], outputs=[model_output, segmented_output], api_name="generate" ) if __name__ == "__main__": # Initialize FLUX pipeline device = "cuda" if torch.cuda.is_available() else "cpu" dtype = torch.bfloat16 repo_id = "black-forest-labs/FLUX.1-dev" adapter_id = "jbilcke-hf/flux-satellite" flux_pipe = DiffusionPipeline.from_pretrained( repo_id, torch_dtype=torch.bfloat16 ) flux_pipe.load_lora_weights(adapter_id) flux_pipe = flux_pipe.to(device) # Launch Gradio app demo.queue().launch()