|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import base64 |
|
import io |
|
import json |
|
import requests |
|
from typing import Dict, List, Tuple, Any |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
CONVERSATION_PROMPT = """...""" |
|
EXTRACTION_PROMPT = """...""" |
|
|
|
|
|
|
|
|
|
|
|
class SalarySimulator: |
|
def __init__(self): |
|
"""Initialize growth and premium calculators.""" |
|
|
|
self.growth_factors = { |
|
"base_growth": lambda score: (0.02 + (score * 0.03), 0.04 + (score * 0.04)), |
|
"skill_premium": lambda score: (0.01 + (score * 0.02), 0.02 + (score * 0.03)), |
|
"experience_premium": lambda score: (0.01 + (score * 0.02), 0.02 + (score * 0.03)), |
|
"education_premium": lambda score: (0.005 + (score * 0.015), 0.01 + (score * 0.02)), |
|
"location_premium": lambda score: (0.0 + (score * 0.02), 0.01 + (score * 0.03)) |
|
} |
|
|
|
|
|
self.risk_factors = { |
|
"volatility": lambda score: (0.02 + (score * 0.02), 0.03 + (score * 0.03)), |
|
"disruption": lambda score: (0.05 + (score * 0.15), 0.1 + (score * 0.2)) |
|
} |
|
|
|
def validate_scores(self, scores: Dict[str, float]) -> None: |
|
"""Validate all required scores are present and valid.""" |
|
required = [ |
|
"industry_score", "experience_score", "education_score", |
|
"skills_score", "location_score", "current_salary" |
|
] |
|
for key in required: |
|
if key not in scores: |
|
raise ValueError(f"Missing required score: {key}") |
|
if key == "current_salary": |
|
if not isinstance(scores[key], (int, float)) or scores[key] <= 0: |
|
raise ValueError("Invalid salary value") |
|
else: |
|
if not 0 <= scores[key] <= 1: |
|
raise ValueError(f"Invalid {key}: must be between 0 and 1") |
|
|
|
def calculate_factor(self, name: str, score: float, factor_type: str) -> float: |
|
"""Calculate growth or risk factor.""" |
|
factors = self.growth_factors if factor_type == "growth" else self.risk_factors |
|
min_val, max_val = factors[name](score) |
|
return np.random.uniform(min_val, max_val) |
|
|
|
def run_simulation(self, scores: Dict[str, float]) -> Tuple[np.ndarray, Dict[str, float]]: |
|
"""Run Monte Carlo simulation.""" |
|
self.validate_scores(scores) |
|
|
|
|
|
factors = {} |
|
score_mapping = { |
|
"base_growth": "industry_score", |
|
"skill_premium": "skills_score", |
|
"experience_premium": "experience_score", |
|
"education_premium": "education_score", |
|
"location_premium": "location_score" |
|
} |
|
|
|
|
|
for factor_name, score_key in score_mapping.items(): |
|
factors[factor_name] = self.calculate_factor(factor_name, scores[score_key], "growth") |
|
|
|
|
|
for factor_name in ["volatility", "disruption"]: |
|
factors[factor_name] = self.calculate_factor( |
|
factor_name, scores["industry_score"], "risk" |
|
) |
|
|
|
|
|
years = 5 |
|
num_paths = 10000 |
|
paths = np.zeros((num_paths, years + 1)) |
|
initial_salary = float(scores["current_salary"]) |
|
paths[:, 0] = initial_salary |
|
|
|
for path in range(num_paths): |
|
salary = initial_salary |
|
for year in range(1, years + 1): |
|
|
|
growth = sum(factors[f] for f in score_mapping.keys()) |
|
|
|
|
|
growth += np.random.normal(0, factors["volatility"]) |
|
|
|
|
|
if np.random.random() < 0.1: |
|
disruption = factors["disruption"] * np.random.random() |
|
if np.random.random() < 0.7: |
|
growth += disruption |
|
else: |
|
growth -= disruption |
|
|
|
|
|
growth = min(max(growth, -0.1), 0.25) |
|
|
|
|
|
salary *= (1 + growth) |
|
paths[path, year] = salary |
|
|
|
return paths, factors |
|
|
|
def create_plots(self, paths: np.ndarray) -> str: |
|
"""Create visualization using matplotlib and return as base64 string.""" |
|
plt.style.use('dark_background') |
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 12), height_ratios=[2, 1]) |
|
fig.tight_layout(pad=4) |
|
|
|
|
|
years = list(range(paths.shape[1])) |
|
|
|
|
|
percentiles = [(5, 95), (10, 90), (25, 75)] |
|
alphas = [0.1, 0.2, 0.3] |
|
|
|
for (lower, upper), alpha in zip(percentiles, alphas): |
|
lower_bound = np.percentile(paths, lower, axis=0) |
|
upper_bound = np.percentile(paths, upper, axis=0) |
|
ax1.fill_between(years, lower_bound, upper_bound, alpha=alpha, color='blue') |
|
|
|
|
|
median = np.percentile(paths, 50, axis=0) |
|
ax1.plot(years, median, color='white', linewidth=2, label='Expected Path') |
|
|
|
|
|
ax1.set_title('Salary Projection', pad=20) |
|
ax1.set_xlabel('Years') |
|
ax1.set_ylabel('Salary ($)') |
|
ax1.grid(True, alpha=0.2) |
|
ax1.legend() |
|
|
|
|
|
ax1.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}')) |
|
|
|
|
|
ax1.set_xticks(years) |
|
ax1.set_xticklabels(['Current'] + [f'Year {i+1}' for i in range(len(years)-1)]) |
|
|
|
|
|
ax2.hist(paths[:, -1], bins=50, color='blue', alpha=0.7) |
|
ax2.set_title('Final Salary Distribution', pad=20) |
|
ax2.set_xlabel('Salary ($)') |
|
ax2.set_ylabel('Count') |
|
ax2.grid(True, alpha=0.2) |
|
|
|
|
|
ax2.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}')) |
|
|
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png', dpi=100, bbox_inches='tight') |
|
buf.seek(0) |
|
img_str = base64.b64encode(buf.read()).decode() |
|
plt.close() |
|
|
|
return img_str |
|
|
|
def generate_report( |
|
self, |
|
scores: Dict[str, float], |
|
paths: np.ndarray, |
|
factors: Dict[str, float] |
|
) -> str: |
|
"""Generate analysis report.""" |
|
final_salaries = paths[:, -1] |
|
initial_salary = paths[0, 0] |
|
|
|
metrics = { |
|
"p25": np.percentile(final_salaries, 25), |
|
"p50": np.percentile(final_salaries, 50), |
|
"p75": np.percentile(final_salaries, 75), |
|
"cagr": (np.median(final_salaries) / initial_salary) ** (1/5) - 1 |
|
} |
|
|
|
report = f""" |
|
Monte Carlo Salary Projection Analysis |
|
==================================== |
|
|
|
Profile Scores (0-1 scale): |
|
-------------------------- |
|
• Industry Score: {scores['industry_score']:.2f} |
|
• Experience Score: {scores['experience_score']:.2f} |
|
• Education Score: {scores['education_score']:.2f} |
|
• Skills Score: {scores['skills_score']:.2f} |
|
• Location Score: {scores['location_score']:.2f} |
|
• Current Salary: ${scores['current_salary']:,.2f} |
|
|
|
Growth Factors (Annual): |
|
----------------------- |
|
• Base Growth: {factors['base_growth']*100:.1f}% |
|
• Skill Premium: {factors['skill_premium']*100:.1f}% |
|
• Experience Premium: {factors['experience_premium']*100:.1f}% |
|
• Education Premium: {factors['education_premium']*100:.1f}% |
|
• Location Premium: {factors['location_premium']*100:.1f}% |
|
• Market Volatility: {factors['volatility']*100:.1f}% |
|
• Potential Disruption: {factors['disruption']*100:.1f}% |
|
|
|
5-Year Projection Results: |
|
------------------------- |
|
• Conservative Estimate (25th percentile): ${metrics['p25']:,.2f} |
|
• Most Likely Outcome (Median): ${metrics['p50']:,.2f} |
|
• Optimistic Estimate (75th percentile): ${metrics['p75']:,.2f} |
|
• Expected Annual Growth Rate: {metrics['cagr']*100:.1f}% |
|
|
|
Analysis Insights: |
|
----------------- |
|
• Career profile suggests {metrics['cagr']*100:.1f}% annual growth potential |
|
• Market volatility could lead to {factors['volatility']*100:.1f}% annual variation |
|
• Industry position provides {factors['base_growth']*100:.1f}% base growth |
|
• Personal factors add {(factors['skill_premium'] + factors['experience_premium'] + factors['education_premium'])*100:.1f}% potential premium |
|
• Location impact contributes {factors['location_premium']*100:.1f}% to growth |
|
|
|
Key Considerations: |
|
------------------ |
|
• Projections based on {paths.shape[0]:,} simulated career paths |
|
• Accounts for both regular growth and market disruptions |
|
• Considers personal development and market factors |
|
• Results show range of potential outcomes |
|
• Actual results may vary based on economic conditions |
|
""" |
|
return report |
|
|
|
|
|
|
|
|
|
|
|
class CareerAdvisor: |
|
def __init__(self): |
|
"""Initialize career advisor.""" |
|
self.chat_history = [] |
|
self.simulator = SalarySimulator() |
|
|
|
def process_message(self, message: str, api_key: str) -> Dict[str, str]: |
|
"""Process user message and generate response.""" |
|
try: |
|
if not api_key.strip().startswith("sk-"): |
|
return {"error": "Invalid API key format"} |
|
|
|
|
|
messages = [ |
|
{"role": "system", "content": CONVERSATION_PROMPT} |
|
] |
|
|
|
|
|
messages.extend(self.chat_history) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
response = requests.post( |
|
"https://api.openai.com/v1/chat/completions", |
|
headers={ |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
}, |
|
json={ |
|
"model": "gpt-4", |
|
"messages": messages, |
|
"temperature": 0.7 |
|
} |
|
) |
|
|
|
if response.status_code == 200: |
|
assistant_message = response.json()["choices"][0]["message"]["content"].strip() |
|
|
|
|
|
self.chat_history.append({"role": "user", "content": message}) |
|
self.chat_history.append({"role": "assistant", "content": assistant_message}) |
|
|
|
return {"response": assistant_message} |
|
else: |
|
return {"error": f"API error: {response.status_code}"} |
|
|
|
except Exception as e: |
|
logger.error(f"Message processing error: {str(e)}") |
|
return {"error": str(e)} |
|
|
|
def extract_profile(self, api_key: str) -> Dict[str, float]: |
|
"""Extract numerical profile from conversation.""" |
|
try: |
|
|
|
conversation = "\n".join([ |
|
f"{msg['role'].title()}: {msg['content']}" |
|
for msg in self.chat_history |
|
]) |
|
|
|
|
|
response = requests.post( |
|
"https://api.openai.com/v1/chat/completions", |
|
headers={ |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
}, |
|
json={ |
|
"model": "gpt-4", |
|
"messages": [ |
|
{ |
|
"role": "system", |
|
"content": EXTRACTION_PROMPT |
|
}, |
|
{ |
|
"role": "user", |
|
"content": f"Extract profile from:\n\n{conversation}" |
|
} |
|
], |
|
"temperature": 0.3 |
|
} |
|
) |
|
|
|
if response.status_code == 200: |
|
profile_data = json.loads( |
|
response.json()["choices"][0]["message"]["content"].strip() |
|
) |
|
return profile_data |
|
else: |
|
raise Exception(f"API error: {response.status_code}") |
|
|
|
except Exception as e: |
|
logger.error(f"Profile extraction error: {str(e)}") |
|
return { |
|
"industry_score": 0.6, |
|
"experience_score": 0.6, |
|
"education_score": 0.6, |
|
"skills_score": 0.6, |
|
"location_score": 0.6, |
|
"current_salary": 85000 |
|
} |
|
|
|
def generate_analysis(self, api_key: str) -> Dict[str, Any]: |
|
"""Generate complete salary analysis.""" |
|
try: |
|
|
|
profile_data = self.extract_profile(api_key) |
|
|
|
|
|
paths, factors = self.simulator.run_simulation(profile_data) |
|
|
|
|
|
plots_image = self.simulator.create_plots(paths) |
|
|
|
|
|
report = self.simulator.generate_report( |
|
profile_data, |
|
paths, |
|
factors |
|
) |
|
|
|
return { |
|
"status": "success", |
|
"report": report, |
|
"plots": plots_image |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Analysis generation error: {str(e)}") |
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
def create_interface(): |
|
"""Create the Gradio interface.""" |
|
advisor = CareerAdvisor() |
|
|
|
|
|
with gr.Blocks(title="Monte Carlo Simulation of Salary Prediction") as demo: |
|
|
|
gr.Markdown(""" |
|
# 💰 Monte Carlo Simulation of Salary Prediction |
|
|
|
Chat with me about your career, and I'll generate detailed salary projections |
|
using Monte Carlo simulation with machine learning. |
|
""") |
|
|
|
|
|
with gr.Row(): |
|
api_key = gr.Textbox( |
|
label="OpenAI API Key", |
|
placeholder="Enter your API key", |
|
type="password" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot( |
|
label="Career Conversation", |
|
height=400, |
|
show_copy_button=True, |
|
type="messages" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
message = gr.Textbox( |
|
label="Your message", |
|
placeholder="Tell me about your career...", |
|
lines=2, |
|
scale=4 |
|
) |
|
send_btn = gr.Button( |
|
"Send Message", |
|
scale=1 |
|
) |
|
|
|
|
|
with gr.Column(scale=3): |
|
status = gr.Textbox(label="Status") |
|
report = gr.TextArea( |
|
label="Analysis Report", |
|
lines=20, |
|
max_lines=30 |
|
) |
|
plots = gr.Image( |
|
label="Salary Projections", |
|
show_download_button=True |
|
) |
|
|
|
|
|
analyze_btn = gr.Button( |
|
"Generate Analysis", |
|
variant="primary", |
|
size="lg" |
|
) |
|
|
|
|
|
def handle_message( |
|
message: str, |
|
history: List[Dict[str, str]], |
|
key: str |
|
) -> Tuple[str, List[Dict[str, str]], str]: |
|
"""Process chat messages.""" |
|
try: |
|
result = advisor.process_message(message, key) |
|
|
|
if "error" in result: |
|
return "", history, f"Error: {result['error']}" |
|
|
|
|
|
new_history = history + [ |
|
{"role": "user", "content": message}, |
|
{"role": "assistant", "content": result["response"]} |
|
] |
|
return "", new_history, "" |
|
|
|
except Exception as e: |
|
return "", history, f"Error: {str(e)}" |
|
|
|
|
|
def generate_analysis(key: str) -> Tuple[str, str, str]: |
|
"""Generate salary analysis.""" |
|
try: |
|
result = advisor.generate_analysis(key) |
|
|
|
if "error" in result: |
|
return f"Error: {result['error']}", "", None |
|
|
|
|
|
plots_image = f"data:image/png;base64,{result['plots']}" |
|
|
|
return ( |
|
"Analysis completed successfully!", |
|
result["report"], |
|
plots_image |
|
) |
|
|
|
except Exception as e: |
|
return f"Error: {str(e)}", "", None |
|
|
|
|
|
message.submit( |
|
handle_message, |
|
inputs=[message, chatbot, api_key], |
|
outputs=[message, chatbot, status], |
|
queue=False |
|
) |
|
|
|
send_btn.click( |
|
handle_message, |
|
inputs=[message, chatbot, api_key], |
|
outputs=[message, chatbot, status], |
|
queue=False |
|
) |
|
|
|
analyze_btn.click( |
|
generate_analysis, |
|
inputs=[api_key], |
|
outputs=[status, report, plots] |
|
) |
|
|
|
return demo |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
"""Launch the application.""" |
|
|
|
demo = create_interface() |
|
|
|
|
|
demo.queue() |
|
|
|
|
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
main() |