BilalSardar's picture
Create app.py
a4ef2ef verified
raw
history blame
8.8 kB
import gradio as gr
from PIL import Image
import tempfile
import os
from pathlib import Path
import shutil
import base64
import requests
import re
import time
from PIL import ImageEnhance
import concurrent.futures
import asyncio
import aiohttp
import io
class StreetViewDownloader:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Using a smaller session for better connection reuse
self.session = requests.Session()
def extract_panoid(self, url):
"""Extract panorama ID from Google Maps URL."""
pattern = r'!1s([A-Za-z0-9-_]+)!'
match = re.search(pattern, url)
if match:
return match.group(1)
raise ValueError("Could not find panorama ID in URL")
async def download_tile_async(self, session, panoid, x, y, adjusted_y, zoom, output_dir):
"""Download a single tile asynchronously."""
tile_url = f"https://streetviewpixels-pa.googleapis.com/v1/tile?cb_client=maps_sv.tactile&panoid={panoid}&x={x}&y={adjusted_y}&zoom={zoom}"
output_file = Path(output_dir) / f"tile_{x}_{y}.jpg"
try:
async with session.get(tile_url, headers=self.headers) as response:
if response.status == 200:
content = await response.read()
if len(content) > 1000:
output_file.write_bytes(content)
return (x, y)
except Exception as e:
print(f"Error downloading tile {x},{y}: {str(e)}")
return None
async def download_tiles_async(self, panoid, output_dir):
"""Download tiles asynchronously with reduced resolution."""
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Reduced parameters for faster download
zoom = 2 # Reduced zoom level (less detail but faster)
cols = 16 # Reduced number of horizontal tiles
rows = 8 # Reduced number of vertical tiles
vertical_offset = 2 # Adjusted for the new grid
print(f"Downloading {cols * rows} tiles for panorama...")
async with aiohttp.ClientSession() as session:
tasks = []
for x in range(cols):
for y in range(rows):
adjusted_y = y - (rows // 2) + vertical_offset
task = self.download_tile_async(session, panoid, x, y, adjusted_y, zoom, output_dir)
tasks.append(task)
downloaded_tiles = []
for result in await asyncio.gather(*tasks):
if result:
downloaded_tiles.append(result)
return cols, rows, downloaded_tiles
def download_tiles(self, panoid, output_dir):
"""Synchronous wrapper for async download."""
return asyncio.run(self.download_tiles_async(panoid, output_dir))
def create_360_panorama(self, directory, cols, rows, downloaded_tiles, output_file):
"""Create an equirectangular 360° panorama from tiles with optimized processing."""
directory = Path(directory)
# Find a valid tile to get dimensions
valid_tile = None
for x, y in downloaded_tiles:
tile_path = directory / f"tile_{x}_{y}.jpg"
if tile_path.exists():
valid_tile = Image.open(tile_path)
break
if not valid_tile:
raise Exception("No valid tiles found in directory")
tile_width, tile_height = valid_tile.size
valid_tile.close()
# Create the panorama at optimized resolution
panorama_width = tile_width * cols
panorama_height = tile_height * rows
# Use RGB mode directly for better performance
panorama = Image.new('RGB', (panorama_width, panorama_height))
# Process tiles in parallel
def process_tile(tile_info):
x, y = tile_info
tile_path = directory / f"tile_{x}_{y}.jpg"
if tile_path.exists():
with Image.open(tile_path) as tile:
if tile.getbbox():
return (x * tile_width, y * tile_height, tile.copy())
return None
# Use ThreadPoolExecutor for parallel processing
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
tile_results = list(executor.map(process_tile, downloaded_tiles))
# Paste all valid tiles
for result in tile_results:
if result:
x, y, tile = result
panorama.paste(tile, (x, y))
tile.close()
# Crop out any remaining black regions
bbox = panorama.getbbox()
if bbox:
panorama = panorama.crop(bbox)
# Quick enhancement
panorama = self.enhance_panorama(panorama)
# Save with optimized settings
panorama.save(output_file, 'JPEG', quality=95, optimize=True)
return output_file
def enhance_panorama(self, panorama):
"""Quick enhancement with minimal processing."""
enhancer = ImageEnhance.Contrast(panorama)
panorama = enhancer.enhance(1.1)
return panorama
def process_street_view(url, progress=gr.Progress()):
"""Process the Street View URL with progress tracking."""
try:
temp_dir = tempfile.mkdtemp()
progress(0.1, desc="Initializing...")
downloader = StreetViewDownloader()
panoid = downloader.extract_panoid(url)
progress(0.2, desc="Extracted panorama ID...")
tiles_dir = os.path.join(temp_dir, f"{panoid}_tiles")
output_file = os.path.join(temp_dir, f"{panoid}_360panorama.jpg")
progress(0.3, desc="Downloading tiles...")
cols, rows, downloaded_tiles = downloader.download_tiles(panoid, tiles_dir)
progress(0.7, desc="Creating panorama...")
final_path = downloader.create_360_panorama(tiles_dir, cols, rows, downloaded_tiles, output_file)
progress(0.8, desc="Cleaning up...")
shutil.rmtree(tiles_dir)
progress(0.9, desc="Preparing viewer...")
with open(final_path, 'rb') as img_file:
img_data = base64.b64encode(img_file.read()).decode()
viewer_html = f"""
<html>
<head>
<script src="https://aframe.io/releases/1.4.0/aframe.min.js"></script>
<style>
.viewer-container {{ height: 500px; width: 100%; }}
</style>
</head>
<body>
<div class="viewer-container">
<a-scene embedded>
<a-sky src="data:image/jpeg;base64,{img_data}"></a-sky>
<a-camera position="0 0 0" wasd-controls-enabled="false"></a-camera>
</a-scene>
</div>
</body>
</html>
"""
progress(1.0, desc="Done!")
return final_path, viewer_html
except Exception as e:
return None, f"Error: {str(e)}"
def create_gradio_interface():
with gr.Blocks() as app:
gr.Markdown("# Fast Street View 360° Panorama Downloader")
gr.Markdown("Enter a Google Street View URL to download and view the panorama")
with gr.Row():
url_input = gr.Textbox(
label="Street View URL",
placeholder="Paste Google Street View URL here..."
)
with gr.Row():
submit_btn = gr.Button("Download and Process")
with gr.Row():
with gr.Column():
output_file = gr.File(label="Download Panorama")
with gr.Column():
viewer = gr.HTML(label="360° Viewer")
submit_btn.click(
fn=process_street_view,
inputs=[url_input],
outputs=[output_file, viewer]
)
gr.Markdown("""
### Instructions:
1. Open Google Street View at your desired location
2. Copy the URL from your browser
3. Paste the URL in the input box above
4. Click "Download and Process"
5. Wait for processing (usually takes less than a minute)
6. Download the panorama or explore it in the 360° viewer
Note: This is an optimized version that provides faster processing with slightly reduced image quality.
""")
return app
if __name__ == "__main__":
app = create_gradio_interface()
app.launch(share=True)