Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from simple_salesforce import Salesforce | |
import os | |
import base64 | |
import datetime | |
from dotenv import load_dotenv | |
from fpdf import FPDF | |
import shutil | |
import html | |
import io | |
import matplotlib.pyplot as plt | |
import numpy as np | |
# Load environment variables | |
load_dotenv() | |
# Required env vars check | |
required_env_vars = ['SF_USERNAME', 'SF_PASSWORD', 'SF_SECURITY_TOKEN'] | |
missing_vars = [var for var in required_env_vars if not os.getenv(var)] | |
if missing_vars: | |
raise EnvironmentError(f"Missing required environment variables: {missing_vars}") | |
# Load model and tokenizer | |
model_name = "distilgpt2" | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True) | |
model = AutoModelForCausalLM.from_pretrained(model_name, low_cpu_mem_usage=True) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids(tokenizer.pad_token) | |
model.config.pad_token_id = tokenizer.pad_token_id | |
# Function to generate progress chart | |
def show_dashboard_chart(start_date, end_date, tasks_completed): | |
completed_tasks = list(tasks_completed.values()) | |
labels = list(tasks_completed.keys()) | |
remaining_tasks = [5 - task for task in completed_tasks] # Assuming 5 tasks per date | |
# Create the bar chart with completed and remaining tasks | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
ax.bar(labels, completed_tasks, color='green', label="Completed") | |
ax.bar(labels, remaining_tasks, bottom=completed_tasks, color='gray', label="Remaining") | |
ax.set_xlabel("Dates") | |
ax.set_ylabel("Task Progress") | |
ax.set_title(f"Task Completion Progress from {start_date} to {end_date}") | |
ax.legend() | |
plt.xticks(rotation=45) | |
chart_image = io.BytesIO() | |
plt.savefig(chart_image, format='png') | |
chart_image.seek(0) | |
return chart_image | |
# Function to get the data from Salesforce | |
def get_dashboard_data_from_salesforce(supervisor_name, project_id): | |
try: | |
sf = Salesforce( | |
username=os.getenv('SF_USERNAME'), | |
password=os.getenv('SF_PASSWORD'), | |
security_token=os.getenv('SF_SECURITY_TOKEN'), | |
domain=os.getenv('SF_DOMAIN', 'login') | |
) | |
# Get the start and end date from Salesforce | |
query = sf.query(f"SELECT Start_Date__c, End_Date__c FROM Project__c WHERE Name = '{project_id}' LIMIT 1") | |
if query['totalSize'] == 0: | |
return "", "", None, "Project not found" | |
start_date_str = query['records'][0]['Start_Date__c'] | |
end_date_str = query['records'][0]['End_Date__c'] | |
# Convert the string dates to datetime objects | |
start_date = datetime.datetime.strptime(start_date_str, "%Y-%m-%d") | |
end_date = datetime.datetime.strptime(end_date_str, "%Y-%m-%d") | |
# Generate task dates and simulated completion data | |
task_dates = [start_date + datetime.timedelta(days=i) for i in range((end_date - start_date).days + 1)] | |
tasks_completed = {str(task_dates[i].date()): np.random.randint(1, 6) for i in range(len(task_dates))} | |
chart_image = show_dashboard_chart(start_date, end_date, tasks_completed) | |
return start_date, end_date, chart_image, f"Project {project_id} Task Progress" | |
except Exception as e: | |
return "", "", None, f"Error: {str(e)}" | |
# Clean text for PDF generation | |
def clean_text_for_pdf(text): | |
return html.unescape(text).encode('latin-1', 'replace').decode('latin-1') | |
# Function to save report as PDF | |
def save_report_as_pdf(role, supervisor_name, project_id, checklist, suggestions): | |
now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
filename = f"report_{supervisor_name}_{project_id}_{now}.pdf" | |
file_path = f"./reports/{filename}" | |
os.makedirs("reports", exist_ok=True) | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_font("Arial", 'B', 14) | |
pdf.cell(200, 10, txt="Supervisor Daily Report", ln=True, align="C") | |
pdf.set_font("Arial", size=12) | |
pdf.cell(200, 10, txt=clean_text_for_pdf(f"Role: {role}"), ln=True) | |
pdf.cell(200, 10, txt=clean_text_for_pdf(f"Supervisor: {supervisor_name}"), ln=True) | |
pdf.cell(200, 10, txt=clean_text_for_pdf(f"Project ID: {project_id}"), ln=True) | |
pdf.ln(5) | |
pdf.set_font("Arial", 'B', 12) | |
pdf.cell(200, 10, txt="Daily Checklist", ln=True) | |
pdf.set_font("Arial", size=12) | |
for line in checklist.split("\n"): | |
pdf.multi_cell(0, 10, clean_text_for_pdf(line)) | |
pdf.ln(5) | |
pdf.set_font("Arial", 'B', 12) | |
pdf.cell(200, 10, txt="Focus Suggestions", ln=True) | |
pdf.set_font("Arial", size=12) | |
for line in suggestions.split("\n"): | |
pdf.multi_cell(0, 10, clean_text_for_pdf(line)) | |
pdf.output(file_path) | |
temp_pdf_path = "/tmp/" + os.path.basename(file_path) | |
shutil.copy(file_path, temp_pdf_path) | |
return temp_pdf_path, filename | |
# Function to get roles from Salesforce | |
def get_roles_from_salesforce(): | |
try: | |
sf = Salesforce( | |
username=os.getenv('SF_USERNAME'), | |
password=os.getenv('SF_PASSWORD'), | |
security_token=os.getenv('SF_SECURITY_TOKEN'), | |
domain=os.getenv('SF_DOMAIN', 'login') | |
) | |
result = sf.query("SELECT Role__c FROM Supervisor__c WHERE Role__c != NULL") | |
return list(set(record['Role__c'] for record in result['records'])) | |
except Exception as e: | |
return [] | |
# Function to get supervisor names based on role | |
def get_supervisor_name_by_role(role): | |
try: | |
sf = Salesforce( | |
username=os.getenv('SF_USERNAME'), | |
password=os.getenv('SF_PASSWORD'), | |
security_token=os.getenv('SF_SECURITY_TOKEN'), | |
domain=os.getenv('SF_DOMAIN', 'login') | |
) | |
result = sf.query(f"SELECT Name FROM Supervisor__c WHERE Role__c = '{role}'") | |
return [record['Name'] for record in result['records']] | |
except Exception as e: | |
return [] | |
# Function to get the project name for a supervisor | |
def get_projects_for_supervisor(supervisor_name): | |
try: | |
sf = Salesforce( | |
username=os.getenv('SF_USERNAME'), | |
password=os.getenv('SF_PASSWORD'), | |
security_token=os.getenv('SF_SECURITY_TOKEN'), | |
domain=os.getenv('SF_DOMAIN', 'login') | |
) | |
result = sf.query(f"SELECT Id FROM Supervisor__c WHERE Name = '{supervisor_name}' LIMIT 1") | |
if result['totalSize'] == 0: | |
return "" | |
supervisor_id = result['records'][0]['Id'] | |
project_result = sf.query(f"SELECT Name FROM Project__c WHERE Supervisor_ID__c = '{supervisor_id}' LIMIT 1") | |
return project_result['records'][0]['Name'] if project_result['totalSize'] > 0 else "" | |
except Exception as e: | |
return "" | |
# Gradio interface | |
def create_interface(): | |
roles = get_roles_from_salesforce() # Get roles from Salesforce dynamically | |
with gr.Blocks(theme="soft", css=".footer { display: none; }") as demo: | |
gr.Markdown("## π§ AI-Powered Supervisor Assistant") | |
with gr.Row(): | |
role = gr.Dropdown(choices=roles, label="Role") | |
supervisor_name = gr.Dropdown(choices=[], label="Supervisor Name") | |
project_id = gr.Textbox(label="Project ID", interactive=False) | |
milestones = gr.Textbox(label="Milestones (comma-separated KPIs)") | |
reflection = gr.Textbox(label="Reflection Log", lines=4) | |
with gr.Row(): | |
generate = gr.Button("Generate") | |
clear = gr.Button("Clear") | |
refresh = gr.Button("π Refresh Roles") | |
checklist_output = gr.Textbox(label="β Daily Checklist") | |
suggestions_output = gr.Textbox(label="π‘ Focus Suggestions") | |
download_button = gr.File(label="β¬ Download Report") | |
pdf_link = gr.HTML() | |
role.change(fn=lambda r: gr.update(choices=get_supervisor_name_by_role(r)), inputs=role, outputs=supervisor_name) | |
supervisor_name.change(fn=get_projects_for_supervisor, inputs=supervisor_name, outputs=project_id) | |
def handle_generate(role, supervisor_name, project_id, milestones, reflection): | |
checklist, suggestions, pdf_path, pdf_name = generate_outputs(role, supervisor_name, project_id, milestones, reflection) | |
return checklist, suggestions, pdf_path, pdf_name | |
generate.click(fn=handle_generate, | |
inputs=[role, supervisor_name, project_id, milestones, reflection], | |
outputs=[checklist_output, suggestions_output, download_button, pdf_link]) | |
clear.click(fn=lambda: ("", "", "", "", ""), | |
inputs=None, | |
outputs=[role, supervisor_name, project_id, milestones, reflection]) | |
refresh.click(fn=lambda: gr.update(choices=get_roles_from_salesforce()), outputs=role) | |
# Supervisor Dashboard Tab | |
with gr.Tab("π Supervisor Dashboard"): | |
dash_supervisor = gr.Textbox(label="Supervisor Name", placeholder="e.g., SUP-056") | |
dash_project = gr.Textbox(label="Project ID", placeholder="e.g., PROJ-078") | |
load_dash = gr.Button("π₯ Load Dashboard") | |
dash_output = gr.HTML() | |
def show_dashboard_html(sup_name, proj_id): | |
start_date, end_date, chart_image, chart_title = get_dashboard_data_from_salesforce(sup_name, proj_id) | |
if chart_image: | |
chart_html = f"<img src='data:image/png;base64,{base64.b64encode(chart_image.read()).decode()}' />" | |
return f"<h3>{chart_title}</h3><p>From {start_date} to {end_date}</p>{chart_html}" | |
else: | |
return f"Error: {chart_title}" | |
load_dash.click(fn=show_dashboard_html, inputs=[dash_supervisor, dash_project], outputs=dash_output) | |
return demo | |
if __name__ == "__main__": | |
app = create_interface() | |
app.launch() | |