from ultralytics import YOLO # Load a model #model = YOLO('yolov8n-cls.pt') # load an official model model = YOLO('best.pt') # load a custom model # Predict with the model results = model('test.jpg') print(results)# predict on an image import asyncio from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware import requests import pandas as pd import json import os,datetime import pandas as pd from sklearn.model_selection import train_test_split, GridSearchCV from sklearn.preprocessing import LabelEncoder from xgboost import XGBClassifier from sklearn.metrics import accuracy_score, classification_report from joblib import dump, load import numpy as np app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def train_the_model(data): try: new_data = data encoders = load('transexpress_encoders.joblib') xgb_model = load('transexpress_xgb_model.joblib') selected_columns = ['customer_name', 'customer_address', 'customer_phone_no', 'weight','cod','pickup_address','client_number','destination_city', 'status_name'] new_data_filled = new_data[selected_columns].fillna('Missing') for col, encoder in encoders.items(): if col in new_data_filled.columns: unseen_categories = set(new_data_filled[col]) - set(encoder.classes_) if unseen_categories: for category in unseen_categories: encoder.classes_ = np.append(encoder.classes_, category) new_data_filled[col] = encoder.transform(new_data_filled[col]) else: new_data_filled[col] = encoder.transform(new_data_filled[col]) X_new = new_data_filled.drop('status_name', axis=1) y_new = new_data_filled['status_name'] X_train, X_test, y_train, y_test = train_test_split(X_new,y_new, test_size=0.2, random_state=42) xgb_model.fit(X_new, y_new) dump(xgb_model,'transexpress_xgb_model.joblib') y_pred = xgb_model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) classification_rep = classification_report(y_test, y_pred) return accuracy,classification_rep,"Model finetuned with new data." except: data = data # Select columns selected_columns = ['customer_name', 'customer_address', 'customer_phone_no', 'weight','cod','pickup_address','client_number','destination_city', 'status_name'] # Handling missing values data_filled = data[selected_columns].fillna('Missing') # Encoding categorical variables encoders = {col: LabelEncoder() for col in selected_columns if data_filled[col].dtype == 'object'} for col, encoder in encoders.items(): data_filled[col] = encoder.fit_transform(data_filled[col]) # Splitting the dataset X = data_filled.drop('status_name', axis=1) y = data_filled['status_name'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Setup the hyperparameter grid to search param_grid = { 'max_depth': [3, 4, 5], 'learning_rate': [0.01, 0.1, 0.4], 'n_estimators': [100, 200, 300], 'subsample': [0.8, 0.9, 1], 'colsample_bytree': [0.3, 0.7] } # Initialize the classifier xgb = XGBClassifier(use_label_encoder=False, eval_metric='logloss') # Setup GridSearchCV grid_search = GridSearchCV(xgb, param_grid, cv=2, n_jobs=-1, scoring='accuracy') # Fit the grid search to the data grid_search.fit(X_train, y_train) # Get the best parameters best_params = grid_search.best_params_ print("Best parameters:", best_params) # Train the model with best parameters best_xgb = XGBClassifier(**best_params, use_label_encoder=False, eval_metric='logloss') best_xgb.fit(X_train, y_train) # Predict on the test set y_pred = best_xgb.predict(X_test) y_pred_proba = best_xgb.predict_proba(X_test) # Evaluate the model accuracy = accuracy_score(y_test, y_pred) classification_rep = classification_report(y_test, y_pred) # Save the model model_filename = 'transexpress_xgb_model.joblib' dump(best_xgb, model_filename) # Save the encoders encoders_filename = 'transexpress_encoders.joblib' dump(encoders, encoders_filename) return accuracy,classification_rep,"base Model trained" @app.get("/trigger_the_data_fecher") async def your_continuous_function(page: str,paginate: str): print("data fetcher running.....") # Initialize an empty DataFrame to store the combined data combined_df = pd.DataFrame() # Update the payload for each page url = "https://report.transexpress.lk/api/orders/delivery-success-rate/return-to-client-orders?page="+page+"&per_page="+paginate payload = {} headers = { 'Cookie': 'development_trans_express_session=NaFDGzh5WQCFwiortxA6WEFuBjsAG9GHIQrbKZ8B' } response = requests.request("GET", url, headers=headers, data=payload) # Sample JSON response json_response = response.json() # Extracting 'data' for conversion data = json_response["return_to_client_orders"]['data'] data_count = len(data) df = pd.json_normalize(data) df['status_name'] = df['status_name'].replace('Partially Delivered', 'Delivered') df['status_name'] = df['status_name'].replace('Received by Client', 'Returned to Client') print("data collected from page : "+page) return "done" #data.to_csv("new.csv") #accuracy,classification_rep,message = train_the_model(df) #return {"message":message,"page_number":page,"data_count":data_count,"accuracy":accuracy,"classification_rep":classification_rep} @app.get("/get_latest_model_updated_time") async def model_updated_time(): try: m_time_encoder = os.path.getmtime('transexpress_encoders.joblib') m_time_model = os.path.getmtime('transexpress_xgb_model.joblib') return {"base model created time ":datetime.datetime.fromtimestamp(m_time_encoder), "last model updated time":datetime.datetime.fromtimestamp(m_time_model)} except: return {"no model found so first trained the model using data fecther"} # Endpoint for making predictions @app.post("/predict") def predict( customer_name: str, customer_address: str, customer_phone: str, weight: int, cod: int, pickup_address: str, client_number:str, destination_city:str ): try: # Load your trained model and encoders xgb_model = load('transexpress_xgb_model.joblib') encoders = load('transexpress_encoders.joblib') except: return {"no model found so first trained the model using data fecther"} # Function to handle unseen labels during encoding def safe_transform(encoder, column): classes = encoder.classes_ return [encoder.transform([x])[0] if x in classes else -1 for x in column] # Convert input data to DataFrame input_data = { 'customer_name': customer_name, 'customer_address': customer_address, 'customer_phone_no': customer_phone, 'weight': weight, 'cod': cod, 'pickup_address':pickup_address, 'client_number':client_number, 'destination_city':destination_city } input_df = pd.DataFrame([input_data]) # Encode categorical variables using the same encoders used during training for col in input_df.columns: if col in encoders: input_df[col] = safe_transform(encoders[col], input_df[col]) # Predict and obtain probabilities pred = xgb_model.predict(input_df) pred_proba = xgb_model.predict_proba(input_df) # Output predicted_status = "Unknown" if pred[0] == -1 else encoders['status_name'].inverse_transform([pred])[0] probability = pred_proba[0][pred[0]] * 100 if pred[0] != -1 else "Unknown" if predicted_status == "Returned to Client": probability = 100 - probability return {"Probability": round(probability,2)}