Spaces:
Runtime error
Runtime error
import pandas as pd | |
import numpy as np | |
from sklearn.cluster import KMeans | |
from sklearn.model_selection import train_test_split | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.metrics import accuracy_score | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import gradio as gr | |
import sqlite3 | |
from datetime import datetime, timedelta | |
def generate_sample_data(): | |
# Generate sample data | |
np.random.seed(42) | |
n_customers = 1000 | |
days_ago = [int(x) for x in np.random.randint(0, 365, n_customers)] | |
crm_data = pd.DataFrame({ | |
'customer_id': range(1, n_customers + 1), | |
'interactions': np.random.randint(1, 100, n_customers), | |
'transactions': np.random.uniform(10, 1000, n_customers), | |
'converted': np.random.choice([0, 1], n_customers, p=[0.7, 0.3]), | |
'timestamp': [datetime.now() - timedelta(days=d) for d in days_ago] | |
}) | |
social_days = [int(x) for x in np.random.randint(0, 365, n_customers)] | |
social_data = pd.DataFrame({ | |
'customer_id': range(1, n_customers + 1), | |
'interactions': np.random.randint(1, 200, n_customers), | |
'open_rate': np.random.uniform(0.1, 0.9, n_customers), | |
'timestamp': [datetime.now() - timedelta(days=d) for d in social_days] | |
}) | |
# Enhanced financial data with more relevant metrics | |
financial_days = [int(x) for x in np.random.randint(0, 365, n_customers)] | |
financial_data = pd.DataFrame({ | |
'customer_id': range(1, n_customers + 1), | |
'transaction_amount': np.random.uniform(50, 5000, n_customers), | |
'transaction_frequency': np.random.randint(1, 20, n_customers), # New column | |
'average_purchase': np.random.uniform(100, 2000, n_customers), # New column | |
'total_spend': np.random.uniform(1000, 50000, n_customers), # New column | |
'transaction_date': [datetime.now() - timedelta(days=d) for d in financial_days] | |
}) | |
return crm_data, social_data, financial_data | |
def init_database(): | |
conn = sqlite3.connect('sales_intelligence.db') | |
cursor = conn.cursor() | |
# Create tables if they don't exist | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS financial_data ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
customer_id INTEGER, | |
transaction_amount FLOAT, | |
transaction_frequency INTEGER, | |
average_purchase FLOAT, | |
total_spend FLOAT, | |
transaction_date DATETIME | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS crm_data ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
customer_id INTEGER, | |
interactions INTEGER, | |
transactions FLOAT, | |
converted INTEGER, | |
timestamp DATETIME | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS social_media_data ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
customer_id INTEGER, | |
interactions INTEGER, | |
open_rate FLOAT, | |
timestamp DATETIME | |
) | |
''') | |
# Generate and insert sample data | |
crm_data, social_data, financial_data = generate_sample_data() | |
try: | |
crm_data.to_sql('crm_data', conn, if_exists='replace', index=False) | |
social_data.to_sql('social_media_data', conn, if_exists='replace', index=False) | |
financial_data.to_sql('financial_data', conn, if_exists='replace', index=False) | |
print(f"Inserted {len(crm_data)} CRM records") | |
print(f"Inserted {len(social_data)} social media records") | |
print(f"Inserted {len(financial_data)} financial records") | |
except sqlite3.Error as e: | |
print(f"Error inserting data: {e}") | |
conn.commit() | |
conn.close() | |
print("Database initialized with sample data!") | |
def segment_prospects(df, data_source): | |
print("Segmenting prospects...") | |
if data_source.lower() == 'financial_databases': | |
# Special handling for financial data | |
kmeans = KMeans(n_clusters=3) | |
df['segment'] = kmeans.fit_predict(df[['transaction_amount', 'transaction_frequency', 'average_purchase']]) | |
segment_labels = ['Low Value', 'Medium Value', 'High Value'] | |
df['segment_label'] = [segment_labels[s] for s in df['segment']] | |
elif 'interactions' in df.columns and 'transactions' in df.columns: | |
kmeans = KMeans(n_clusters=3) | |
df['segment'] = kmeans.fit_predict(df[['interactions', 'transactions']]) | |
print("Columns after segmentation:", df.columns) | |
return df | |
def performance_analysis(df, data_source): | |
print("Analyzing performance...") | |
insights = {} | |
if data_source.lower() == 'financial_databases': | |
# Specific analysis for financial data | |
if 'segment' in df.columns: | |
# Overall metrics | |
insights['overall_metrics'] = { | |
'total_revenue': float(df['total_spend'].sum()), | |
'average_transaction': float(df['transaction_amount'].mean()), | |
'total_customers': len(df), | |
'average_frequency': float(df['transaction_frequency'].mean()) | |
} | |
# Segment-specific metrics | |
segment_metrics = df.groupby('segment').agg({ | |
'transaction_amount': ['mean', 'max'], | |
'transaction_frequency': 'mean', | |
'total_spend': 'sum', | |
'average_purchase': 'mean' | |
}).round(2) | |
# Convert the segment metrics to a more readable format | |
for segment in df['segment'].unique(): | |
insights[f'segment_{segment}'] = { | |
'avg_transaction': float(segment_metrics.loc[segment, ('transaction_amount', 'mean')]), | |
'max_transaction': float(segment_metrics.loc[segment, ('transaction_amount', 'max')]), | |
'avg_frequency': float(segment_metrics.loc[segment, ('transaction_frequency', 'mean')]), | |
'total_revenue': float(segment_metrics.loc[segment, ('total_spend', 'sum')]), | |
'avg_purchase': float(segment_metrics.loc[segment, ('average_purchase', 'mean')]) | |
} | |
return pd.DataFrame.from_dict(insights, orient='index') | |
else: | |
# Original analysis for other data sources | |
if 'segment' in df.columns: | |
insights = df.groupby('segment').mean() | |
return insights | |
return pd.DataFrame() | |
def load_data(data_source): | |
conn = sqlite3.connect('sales_intelligence.db') | |
if data_source.lower() == 'crm': | |
return pd.read_sql('SELECT * FROM crm_data', conn) | |
elif data_source.lower() == 'social_media': | |
return pd.read_sql('SELECT * FROM social_media_data', conn) | |
elif data_source.lower() == 'financial_databases': | |
return pd.read_sql('SELECT * FROM financial_data', conn) | |
else: | |
return pd.DataFrame() | |
def preprocess_data(df): | |
# Add any necessary preprocessing steps here | |
return df | |
def predict_lead_conversion(df): | |
# Example model for lead conversion prediction | |
X = df[['interactions', 'transactions']] | |
y = df['converted'] | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
model = RandomForestClassifier(n_estimators=100, random_state=42) | |
model.fit(X_train, y_train) | |
y_pred = model.predict(X_test) | |
accuracy = accuracy_score(y_test, y_pred) | |
return model, accuracy | |
def sales_intelligence_platform(data_source): | |
print("Processing data source:", data_source) | |
data = load_data(data_source) | |
if data.empty: | |
return {"error": f"No data found for source: {data_source}. Valid sources are: 'CRM', 'social_media', 'financial_databases'"} | |
data = preprocess_data(data) | |
data = segment_prospects(data, data_source) | |
model, accuracy = predict_lead_conversion(data) if data_source.lower() == 'crm' else (None, None) | |
insights = performance_analysis(data, data_source) | |
if insights.empty: | |
return {"error": "Could not generate insights from the data"} | |
result_dict = insights.to_dict() | |
# Add some helpful messages | |
if data_source.lower() == 'financial_databases': | |
result_dict['analysis_description'] = { | |
'segment_0': 'Low Value Customers', | |
'segment_1': 'Medium Value Customers', | |
'segment_2': 'High Value Customers' | |
} | |
return result_dict | |
# Initialize the database with sample data | |
init_database() | |
# Create Gradio interface | |
iface = gr.Interface( | |
fn=sales_intelligence_platform, | |
inputs=gr.Dropdown( | |
choices=["CRM", "social_media", "financial_databases"], | |
label="Select Data Source" | |
), | |
outputs="json", | |
title="Sales Intelligence Platform", | |
description="A platform powered by AI to manage sales data and provide insights. Choose a data source to analyze.", | |
theme="dark" | |
) | |
if __name__ == "__main__": | |
iface.launch() |