import gradio as gr from PIL import Image import tempfile import os from pathlib import Path import shutil import base64 import requests import re import time from PIL import ImageEnhance import concurrent.futures import asyncio import aiohttp import io class StreetViewDownloader: def __init__(self): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } # Using a smaller session for better connection reuse self.session = requests.Session() def extract_panoid(self, url): """Extract panorama ID from Google Maps URL.""" pattern = r'!1s([A-Za-z0-9-_]+)!' match = re.search(pattern, url) if match: return match.group(1) raise ValueError("Could not find panorama ID in URL") async def download_tile_async(self, session, panoid, x, y, adjusted_y, zoom, output_dir): """Download a single tile asynchronously.""" tile_url = f"https://streetviewpixels-pa.googleapis.com/v1/tile?cb_client=maps_sv.tactile&panoid={panoid}&x={x}&y={adjusted_y}&zoom={zoom}" output_file = Path(output_dir) / f"tile_{x}_{y}.jpg" try: async with session.get(tile_url, headers=self.headers) as response: if response.status == 200: content = await response.read() if len(content) > 1000: output_file.write_bytes(content) return (x, y) except Exception as e: print(f"Error downloading tile {x},{y}: {str(e)}") return None async def download_tiles_async(self, panoid, output_dir): """Download tiles asynchronously with reduced resolution.""" Path(output_dir).mkdir(parents=True, exist_ok=True) # Reduced parameters for faster download zoom = 2 # Reduced zoom level (less detail but faster) cols = 16 # Reduced number of horizontal tiles rows = 8 # Reduced number of vertical tiles vertical_offset = 2 # Adjusted for the new grid print(f"Downloading {cols * rows} tiles for panorama...") async with aiohttp.ClientSession() as session: tasks = [] for x in range(cols): for y in range(rows): adjusted_y = y - (rows // 2) + vertical_offset task = self.download_tile_async(session, panoid, x, y, adjusted_y, zoom, output_dir) tasks.append(task) downloaded_tiles = [] for result in await asyncio.gather(*tasks): if result: downloaded_tiles.append(result) return cols, rows, downloaded_tiles def download_tiles(self, panoid, output_dir): """Synchronous wrapper for async download.""" return asyncio.run(self.download_tiles_async(panoid, output_dir)) def create_360_panorama(self, directory, cols, rows, downloaded_tiles, output_file): """Create an equirectangular 360° panorama from tiles with optimized processing.""" directory = Path(directory) # Find a valid tile to get dimensions valid_tile = None for x, y in downloaded_tiles: tile_path = directory / f"tile_{x}_{y}.jpg" if tile_path.exists(): valid_tile = Image.open(tile_path) break if not valid_tile: raise Exception("No valid tiles found in directory") tile_width, tile_height = valid_tile.size valid_tile.close() # Create the panorama at optimized resolution panorama_width = tile_width * cols panorama_height = tile_height * rows # Use RGB mode directly for better performance panorama = Image.new('RGB', (panorama_width, panorama_height)) # Process tiles in parallel def process_tile(tile_info): x, y = tile_info tile_path = directory / f"tile_{x}_{y}.jpg" if tile_path.exists(): with Image.open(tile_path) as tile: if tile.getbbox(): return (x * tile_width, y * tile_height, tile.copy()) return None # Use ThreadPoolExecutor for parallel processing with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: tile_results = list(executor.map(process_tile, downloaded_tiles)) # Paste all valid tiles for result in tile_results: if result: x, y, tile = result panorama.paste(tile, (x, y)) tile.close() # Crop out any remaining black regions bbox = panorama.getbbox() if bbox: panorama = panorama.crop(bbox) # Quick enhancement panorama = self.enhance_panorama(panorama) # Save with optimized settings panorama.save(output_file, 'JPEG', quality=95, optimize=True) return output_file def enhance_panorama(self, panorama): """Quick enhancement with minimal processing.""" enhancer = ImageEnhance.Contrast(panorama) panorama = enhancer.enhance(1.1) return panorama def process_street_view(url, progress=gr.Progress()): """Process the Street View URL with progress tracking.""" try: temp_dir = tempfile.mkdtemp() progress(0.1, desc="Initializing...") downloader = StreetViewDownloader() panoid = downloader.extract_panoid(url) progress(0.2, desc="Extracted panorama ID...") tiles_dir = os.path.join(temp_dir, f"{panoid}_tiles") output_file = os.path.join(temp_dir, f"{panoid}_360panorama.jpg") progress(0.3, desc="Downloading tiles...") cols, rows, downloaded_tiles = downloader.download_tiles(panoid, tiles_dir) progress(0.7, desc="Creating panorama...") final_path = downloader.create_360_panorama(tiles_dir, cols, rows, downloaded_tiles, output_file) progress(0.8, desc="Cleaning up...") shutil.rmtree(tiles_dir) progress(0.9, desc="Preparing viewer...") with open(final_path, 'rb') as img_file: img_data = base64.b64encode(img_file.read()).decode() viewer_html = f"""