shrijayan
Add localhost Vite development server port to allowed origins
8e74260
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import StreamingResponse
import pandas as pd
from io import BytesIO
import time
from place2geocode import get_lat_long
from utils import setup_logging, clean_address, handle_empty_values, validate_excel_file, meters_to_miles
from distance_calculator import get_distance
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
logger = setup_logging()
allow_origins = [
"https://*.hf.space",
"http://localhost:7860",
"http://localhost:5173" # Default Vite development server port
]
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def process_uploaded_file(file_stream):
"""Process uploaded file stream and return output Excel bytes"""
# Validate file
is_valid, message = validate_excel_file(file_stream)
if not is_valid:
raise ValueError(message)
# Reset stream position after validation
file_stream.seek(0)
# Rest of the processing code remains the same...
# Create in-memory output file
output = BytesIO()
with pd.ExcelWriter(output) as writer:
sheet_data = extract_address_data(file_stream)
if not sheet_data:
raise ValueError("No valid data found in any sheet.")
for sheet_name, df in sheet_data.items():
addresses = create_address_strings(df)
results = get_route_distances(addresses)
output_df = create_output_dataframe(sheet_name, results)
output_df.to_excel(writer, sheet_name=sheet_name, index=False)
output.seek(0)
return output
def extract_address_data(file_stream):
"""Extract address data from Excel file stream"""
sheet_data = {}
try:
# Read the Excel file directly from the file stream
excel_file = pd.read_excel(file_stream, sheet_name=None, header=1)
except Exception as e:
logger.error(f"Error reading Excel file: {str(e)}")
return {}
required_columns = [
'Address', 'City', 'Zipcode',
'Drop Address', 'Drop City', 'Drop Zipcode'
]
for sheet_name, df in excel_file.items():
if sheet_name.startswith('!'):
continue
logger.info(f"Processing sheet: {sheet_name}")
try:
if not all(col in df.columns for col in required_columns):
missing_cols = [col for col in required_columns if col not in df.columns]
logger.warning(f"Sheet '{sheet_name}' missing columns: {missing_cols}")
continue
df = handle_empty_values(df, required_columns)
df = df.loc[(df['Address'].str.strip() != '') &
(df['Drop Address'].str.strip() != '')]
if df.empty:
logger.warning(f"Sheet '{sheet_name}' has no valid data")
continue
sheet_data[sheet_name] = df
except Exception as e:
logger.error(f"Sheet {sheet_name} error: {str(e)}")
return sheet_data
@app.post("/upload")
async def upload_files(files: list[UploadFile] = File(...)):
"""Handle file uploads and return processed file(s)"""
try:
if len(files) == 1:
# Handle single file
file = files[0]
if not allowed_file(file.filename):
raise HTTPException(status_code=400, detail="Invalid file type")
file_stream = await file.read()
output = process_uploaded_file(BytesIO(file_stream))
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename=processed_{file.filename}"}
)
else:
# Handle multiple files as ZIP
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED) as zip_file:
for file in files:
if not allowed_file(file.filename):
continue
file_stream = await file.read()
output = process_uploaded_file(BytesIO(file_stream))
zip_file.writestr(f"processed_{file.filename}", output.getvalue())
zip_buffer.seek(0)
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=processed_files.zip"}
)
except Exception as e:
logger.error(f"Processing error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in \
{'xlsx', 'xls'}
def create_address_strings(df):
"""Create formatted address strings for geocoding."""
addresses = []
for _, row in df.iterrows():
# Clean and format start address
start_address = clean_address(f"{row['Address']}, {row['City']}")
# Clean and format drop address
drop_address = clean_address(f"{row['Drop Address']}, {row['Drop City']}")
addresses.append((start_address, drop_address))
return addresses
def get_route_distances(addresses):
logger = setup_logging()
results = []
for start_address, drop_address in addresses:
try:
# Get coordinates for start address
logger.info(f"Geocoding start address: '{start_address}'")
start_coords = get_lat_long(start_address)
logger.info(f"Start coordinates: {start_coords}")
if not start_coords:
logger.warning(f"Could not geocode start address: '{start_address}'")
results.append((start_address, drop_address, None))
continue
# Get coordinates for drop address
logger.info(f"Geocoding drop address: '{drop_address}'")
drop_coords = get_lat_long(drop_address)
logger.info(f"Drop coordinates: {drop_coords}")
if not drop_coords:
logger.warning(f"Could not geocode drop address: '{drop_address}'")
results.append((start_address, drop_address, None))
continue
# Geopy returns coordinates as (latitude, longitude)
lat1, lon1 = start_coords
lat2, lon2 = drop_coords
logger.info(f"Getting route from ({lat1}, {lon1}) to ({lat2}, {lon2})")
# Try to get driving distance from routing API
distance = get_distance(lon1, lat1, lon2, lat2)
print(distance)
# If API fails, fall back to direct distance
if distance is None:
logger.warning("API routing failed, falling back to direct distance calculation")
distance = get_distance(lat1, lon1, lat2, lon2)
logger.info(f"Direct distance calculated: {distance} miles")
else:
logger.info(f"Route distance calculated: {distance} miles")
distance_in_miles = round(meters_to_miles(distance['routes'][0]['distance']), 2)
results.append((start_address, drop_address, distance_in_miles))
# Add a small delay to avoid overwhelming the geocoding service
time.sleep(0.5)
except Exception as e:
logger.error(f"Error processing route from '{start_address}' to '{drop_address}': {str(e)}")
results.append((start_address, drop_address, None))
return results
def create_output_dataframe(sheet_name, results):
# logger = setup_logging()
data = {
'start': [result[0] for result in results],
'drop': [result[1] for result in results],
'distance': [result[2] for result in results]
}
df = pd.DataFrame(data)
# # Log the dataframe content for debugging
# logger.info(f"Output dataframe for sheet {sheet_name}:")
# logger.info(f"DataFrame shape: {df.shape}")
# logger.info(f"DataFrame columns: {df.columns.tolist()}")
# logger.info(f"First few rows:\n{df.head()}")
# logger.info(f"Distance column data types: {df['distance'].dtype}")
# logger.info(f"Distance column values: {df['distance'].tolist()}")
return df
@app.get("/")
def index():
return "Welcome to the Excel Processor API. Use the /upload endpoint to upload an Excel file."
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=7860, log_level="debug")