|
import os |
|
import requests |
|
import gradio as gr |
|
import pandas as pd |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
from typing import Dict, List, Tuple, Optional |
|
from dataclasses import dataclass |
|
from sklearn.preprocessing import StandardScaler, LabelEncoder |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score |
|
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor |
|
from sklearn.impute import SimpleImputer |
|
import statsmodels.api as sm |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
from scipy import stats |
|
|
|
@dataclass |
|
class AnalysisConfig: |
|
"""Configuration for analysis parameters""" |
|
max_iterations: int = 5 |
|
min_samples_for_analysis: int = 30 |
|
correlation_threshold: float = 0.7 |
|
max_categories_for_viz: int = 10 |
|
significance_level: float = 0.05 |
|
|
|
class DataAnalyzer: |
|
"""Intelligent data analysis agent that determines appropriate visualizations and analyses""" |
|
|
|
def __init__(self, api_key: str): |
|
self.api_key = api_key |
|
self.config = AnalysisConfig() |
|
self.current_iteration = 0 |
|
self.analysis_results = [] |
|
|
|
def call_gpt4o_mini(self, prompt: str, system_prompt: str) -> str: |
|
"""Call GPT-4o-mini API with proper error handling""" |
|
try: |
|
client = openai.OpenAI(api_key=self.api_key) |
|
messages = [ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": prompt} |
|
] |
|
|
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=messages, |
|
max_tokens=500, |
|
temperature=0.7 |
|
) |
|
return response.choices[0].message.content |
|
except Exception as e: |
|
return f"API Error: {str(e)}" |
|
|
|
def analyze_data_types(self, df: pd.DataFrame) -> Dict: |
|
"""Analyze data types and basic statistics of the DataFrame""" |
|
analysis = { |
|
"numeric_cols": df.select_dtypes(include=['int64', 'float64']).columns.tolist(), |
|
"categorical_cols": df.select_dtypes(include=['object', 'category']).columns.tolist(), |
|
"temporal_cols": df.select_dtypes(include=['datetime64']).columns.tolist(), |
|
"missing_values": df.isnull().sum().to_dict(), |
|
"unique_counts": df.nunique().to_dict() |
|
} |
|
return analysis |
|
|
|
def create_visualization(self, df: pd.DataFrame, viz_type: str, columns: List[str]) -> str: |
|
"""Create and save visualization based on data types and relationships""" |
|
plt.figure(figsize=(10, 6)) |
|
|
|
if viz_type == "correlation": |
|
sns.heatmap(df[columns].corr(), annot=True, cmap='coolwarm') |
|
plt.title("Correlation Matrix") |
|
elif viz_type == "distribution": |
|
for col in columns: |
|
sns.histplot(data=df, x=col, kde=True) |
|
plt.title(f"Distribution of {col}") |
|
elif viz_type == "boxplot": |
|
sns.boxplot(data=df[columns]) |
|
plt.title("Box Plot of Numeric Variables") |
|
|
|
output_path = f"viz_{self.current_iteration}.png" |
|
plt.savefig(output_path) |
|
plt.close() |
|
return output_path |
|
|
|
def perform_statistical_tests(self, df: pd.DataFrame, data_types: Dict) -> Dict: |
|
"""Perform relevant statistical tests based on data types""" |
|
results = {} |
|
|
|
|
|
for col in data_types["numeric_cols"]: |
|
if len(df[col].dropna()) > 3: |
|
stat, p_value = stats.normaltest(df[col].dropna()) |
|
results[f"normality_{col}"] = { |
|
"statistic": stat, |
|
"p_value": p_value, |
|
"is_normal": p_value > self.config.significance_level |
|
} |
|
|
|
|
|
for col1 in data_types["categorical_cols"]: |
|
for col2 in data_types["categorical_cols"]: |
|
if col1 < col2: |
|
contingency = pd.crosstab(df[col1], df[col2]) |
|
chi2, p_value, _, _ = stats.chi2_contingency(contingency) |
|
results[f"chi2_{col1}_{col2}"] = { |
|
"statistic": chi2, |
|
"p_value": p_value, |
|
"is_significant": p_value < self.config.significance_level |
|
} |
|
|
|
return results |
|
|
|
def train_predictive_model(self, df: pd.DataFrame, target_col: str) -> Tuple[float, str]: |
|
"""Train and evaluate a predictive model based on data characteristics""" |
|
X = df.drop(columns=[target_col]) |
|
y = df[target_col] |
|
|
|
|
|
numeric_transformer = Pipeline([ |
|
('imputer', SimpleImputer(strategy='median')), |
|
('scaler', StandardScaler()) |
|
]) |
|
|
|
categorical_transformer = Pipeline([ |
|
('imputer', SimpleImputer(strategy='constant', fill_value='missing')), |
|
('onehot', OneHotEncoder(handle_unknown='ignore')) |
|
]) |
|
|
|
preprocessor = ColumnTransformer( |
|
transformers=[ |
|
('num', numeric_transformer, X.select_dtypes(include=['int64', 'float64']).columns), |
|
('cat', categorical_transformer, X.select_dtypes(include=['object']).columns) |
|
]) |
|
|
|
if len(np.unique(y)) <= 5: |
|
model = RandomForestClassifier(n_estimators=100, random_state=42) |
|
metric = 'accuracy' |
|
else: |
|
model = RandomForestRegressor(n_estimators=100, random_state=42) |
|
metric = 'r2' |
|
|
|
pipeline = Pipeline([ |
|
('preprocessor', preprocessor), |
|
('model', model) |
|
]) |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
pipeline.fit(X_train, y_train) |
|
y_pred = pipeline.predict(X_test) |
|
|
|
if metric == 'accuracy': |
|
score = accuracy_score(y_test, y_pred) |
|
else: |
|
score = r2_score(y_test, y_pred) |
|
|
|
return score, metric |
|
|
|
class GradioInterface: |
|
"""Gradio interface for the data analysis agent""" |
|
|
|
def __init__(self): |
|
self.analyzer = None |
|
self.df = None |
|
|
|
DEFAULT_SYSTEM_PROMPT = """ |
|
<DataScienceExpertFramework version="1.0"> |
|
<Identity> |
|
<Description> |
|
You are an expert data scientist and analyst who combines technical precision with clear communication. You specialize in uncovering insights through advanced statistical analysis, machine learning, and data visualization. |
|
</Description> |
|
</Identity> |
|
<CoreCapabilities> |
|
<Analysis> |
|
<Capability>Advanced statistical analysis and hypothesis testing</Capability> |
|
<Capability>Machine learning model development and evaluation</Capability> |
|
<Capability>Data visualization and exploratory data analysis</Capability> |
|
<Capability>Pattern recognition and trend identification</Capability> |
|
<Capability>Feature engineering and selection</Capability> |
|
</Analysis> |
|
<Communication> |
|
<Style>Clear and precise technical explanations</Style> |
|
<Style>Business-oriented insights translation</Style> |
|
<Style>Visual representation of complex patterns</Style> |
|
</Communication> |
|
</CoreCapabilities> |
|
<AnalysisApproach> |
|
<Step>Data Quality Assessment</Step> |
|
<Step>Exploratory Data Analysis</Step> |
|
<Step>Statistical Testing</Step> |
|
<Step>Pattern Recognition</Step> |
|
<Step>Insight Generation</Step> |
|
<Step>Visualization Creation</Step> |
|
<Step>Recommendations Development</Step> |
|
</AnalysisApproach> |
|
<OutputGuidelines> |
|
<Format> |
|
<Section>Key Findings Summary</Section> |
|
<Section>Detailed Statistical Analysis</Section> |
|
<Section>Visualization Descriptions</Section> |
|
<Section>Actionable Recommendations</Section> |
|
</Format> |
|
<Standards> |
|
<Standard>Always explain statistical significance</Standard> |
|
<Standard>Provide context for numerical findings</Standard> |
|
<Standard>Highlight practical implications</Standard> |
|
<Standard>Address data limitations</Standard> |
|
</Standards> |
|
</OutputGuidelines> |
|
</DataScienceExpertFramework> |
|
""" |
|
|
|
def create_interface(self): |
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π Intelligent Data Analysis Agent") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
api_key = gr.Textbox( |
|
label="GPT-4o-mini API Key", |
|
type="password", |
|
placeholder="sk-..." |
|
) |
|
file_input = gr.File( |
|
label="Upload CSV file" |
|
) |
|
|
|
with gr.Accordion("βοΈ Advanced Settings", open=False): |
|
system_prompt = gr.TextArea( |
|
label="System Prompt", |
|
value=DEFAULT_SYSTEM_PROMPT, |
|
lines=8 |
|
) |
|
|
|
with gr.Row(): |
|
analysis_notes = gr.Textbox( |
|
label="Analysis Notes (Optional)", |
|
placeholder="Any specific analysis preferences...") |
|
|
|
with gr.Row(): |
|
analyze_btn = gr.Button("Analyze Data") |
|
clear_btn = gr.Button("Clear") |
|
|
|
output_text = gr.Markdown() |
|
output_gallery = gr.Gallery() |
|
|
|
def analyze(api_key, file, notes, system_prompt): |
|
if not api_key or not file: |
|
return "Please provide both API key and data file.", None |
|
|
|
try: |
|
self.df = pd.read_csv(file.name) |
|
self.analyzer = DataAnalyzer(api_key) |
|
|
|
|
|
prompt = f"Data columns: {list(self.df.columns)}\nUser notes: {notes}\nSuggest appropriate analyses and visualizations." |
|
ai_suggestions = self.analyzer.call_gpt4o_mini(prompt) |
|
|
|
|
|
data_types = self.analyzer.analyze_data_types(self.df) |
|
stats_results = self.analyzer.perform_statistical_tests(self.df, data_types) |
|
|
|
|
|
viz_paths = [] |
|
for viz_type in ["correlation", "distribution", "boxplot"]: |
|
if data_types["numeric_cols"]: |
|
path = self.analyzer.create_visualization( |
|
self.df, viz_type, data_types["numeric_cols"] |
|
) |
|
viz_paths.append(path) |
|
|
|
|
|
summary = f""" |
|
## Data Analysis Results |
|
|
|
### AI Suggestions |
|
{ai_suggestions} |
|
|
|
### Basic Statistics |
|
- Rows: {len(self.df)} |
|
- Columns: {len(self.df.columns)} |
|
- Missing Values: {sum(data_types['missing_values'].values())} |
|
|
|
### Statistical Tests |
|
{self._format_stats_results(stats_results)} |
|
""" |
|
|
|
return summary, viz_paths |
|
|
|
except Exception as e: |
|
return f"Error during analysis: {str(e)}", None |
|
|
|
analyze_btn.click( |
|
analyze, |
|
inputs=[api_key, file_input, analysis_notes, system_prompt], |
|
outputs=[output_text, output_gallery] |
|
) |
|
|
|
clear_btn.click( |
|
lambda: (None, None), |
|
outputs=[output_text, output_gallery] |
|
) |
|
|
|
return demo |
|
|
|
@staticmethod |
|
def _format_stats_results(results: Dict) -> str: |
|
"""Format statistical results for display""" |
|
formatted = [] |
|
for test_name, result in results.items(): |
|
if "normality" in test_name: |
|
formatted.append(f"- {test_name}: {'Normal' if result['is_normal'] else 'Non-normal'} " |
|
f"(p={result['p_value']:.4f})") |
|
elif "chi2" in test_name: |
|
formatted.append(f"- {test_name}: {'Significant' if result['is_significant'] else 'Not significant'} " |
|
f"(p={result['p_value']:.4f})") |
|
return "\n".join(formatted) |
|
|
|
if __name__ == "__main__": |
|
interface = GradioInterface() |
|
demo = interface.create_interface() |
|
demo.launch(share=True) |