Spaces:
Running
Running
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from sklearn.manifold import TSNE | |
from sklearn.decomposition import PCA | |
import joblib | |
from tensorflow.keras.models import load_model | |
from keras.models import load_model | |
# Load data and models | |
df = pd.read_csv("dataset.csv") | |
scaler = joblib.load("scaler.pkl") | |
encoder = load_model("encoder.h5") | |
autoencoder = load_model("autoencoder.h5", compile=False) | |
# Safely extract correct feature columns for scaler | |
if hasattr(scaler, 'feature_names_in_'): | |
feature_cols = scaler.feature_names_in_.tolist() | |
else: | |
# Fallback: exclude known non-feature columns | |
feature_cols = df.columns.difference([ | |
'COMM_NAME', 'COMM_CODE', 'COMM_WT', 'Cluster', 'Reconstruction_Error', | |
'Anomaly', 'tSNE_1', 'tSNE_2', 'PCA_1', 'PCA_2' | |
]).tolist() | |
# Transform only the original features the scaler expects | |
X_scaled = scaler.transform(df[feature_cols]) | |
encoded_data = encoder.predict(X_scaled) | |
# Add t-SNE and PCA embeddings if not already present | |
if 'tSNE_1' not in df.columns: | |
tsne = TSNE(n_components=2, random_state=42) | |
tsne_result = tsne.fit_transform(encoded_data) | |
df['tSNE_1'] = tsne_result[:, 0] | |
df['tSNE_2'] = tsne_result[:, 1] | |
if 'PCA_1' not in df.columns: | |
pca = PCA(n_components=2) | |
pca_result = pca.fit_transform(encoded_data) | |
df['PCA_1'] = pca_result[:, 0] | |
df['PCA_2'] = pca_result[:, 1] | |
# Gradio UI functions | |
def plot_cluster_visualization(plot_type, cluster_id): | |
plt.figure(figsize=(8, 6)) | |
x, y = ('tSNE_1', 'tSNE_2') if plot_type == 't-SNE' else ('PCA_1', 'PCA_2') | |
for cluster in sorted(df['Cluster'].unique()): | |
subset = df[df['Cluster'] == cluster] | |
plt.scatter(subset[x], subset[y], label=f'Cluster {cluster}', s=60) | |
if cluster_id != 'All': | |
cluster_id = int(cluster_id) | |
selected = df[df['Cluster'] == cluster_id] | |
plt.scatter(selected[x], selected[y], edgecolor='black', facecolor='none', s=120, label='Selected Cluster') | |
plt.title(f"{plot_type} Visualization of Clusters") | |
plt.xlabel(x) | |
plt.ylabel(y) | |
plt.legend() | |
plt.grid(True) | |
return plt.gcf() | |
def show_cluster_commodities(cluster_id, top_n): | |
if cluster_id == 'All': | |
result = df.sort_values(by='Reconstruction_Error', ascending=False) | |
else: | |
cluster_id = int(cluster_id) | |
result = df[df['Cluster'] == cluster_id].sort_values(by='Reconstruction_Error', ascending=False) | |
return result[['COMM_NAME', 'Cluster', 'Reconstruction_Error', 'Anomaly']].head(top_n) | |
def show_anomalies(top_n): | |
anomalies = df[df['Anomaly']].sort_values(by='Reconstruction_Error', ascending=False) | |
return anomalies[['COMM_NAME', 'Cluster', 'Reconstruction_Error']].head(top_n) | |
# Gradio UI layout | |
with gr.Blocks() as demo: | |
gr.Markdown("# π Commodity Index Clustering + Anomaly Detection (Autoencoder)") | |
with gr.Row(): | |
plot_type = gr.Radio(["t-SNE", "PCA"], label="Plot Type", value="t-SNE") | |
cluster_choice = gr.Dropdown(['All'] + list(map(str, sorted(df['Cluster'].unique()))), label="Cluster", value='All') | |
top_n = gr.Slider(5, 50, step=1, label="Top N Results", value=10) | |
with gr.Row(): | |
plot_output = gr.Plot() | |
table_output = gr.Dataframe() | |
plot_button = gr.Button("Show Cluster Visualization") | |
plot_button.click(fn=plot_cluster_visualization, inputs=[plot_type, cluster_choice], outputs=plot_output) | |
cluster_table_btn = gr.Button("Show Cluster Commodities") | |
cluster_table_btn.click(fn=show_cluster_commodities, inputs=[cluster_choice, top_n], outputs=table_output) | |
anomaly_btn = gr.Button("Show Top Anomalies") | |
anomaly_btn.click(fn=show_anomalies, inputs=[top_n], outputs=table_output) | |
demo.launch() | |