sahsan's picture
Added buttons linking project files
7d42f70
import streamlit as st
from chains import rag_chain
from history import get_session_history
from langchain_core.runnables.history import RunnableWithMessageHistory
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import numpy as np
import time
import os
from dotenv import load_dotenv
import json
from openai import OpenAI
import sqlite3
import hashlib
import secrets
from datetime import datetime, timedelta
# Load environment variables
load_dotenv()
# Database setup
database_path='./docs/users.db'
def init_database():
"""Initialize SQLite database with users table"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
api_key TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
# Create indexes for better performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_username ON users(username)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_email ON users(email)')
conn.commit()
conn.close()
print("Database initialized successfully")
return True
except Exception as e:
print(f"Error initializing database: {e}")
if 'st' in globals():
st.error(f"Database initialization failed: {e}")
return False
def hash_password(password, salt=None):
"""Hash password with salt using SHA-256"""
if salt is None:
salt = secrets.token_hex(16)
# Combine password and salt
password_salt = password + salt
# Hash using SHA-256
password_hash = hashlib.sha256(password_salt.encode()).hexdigest()
return password_hash, salt
def verify_password(password, stored_hash, salt):
"""Verify password against stored hash"""
password_hash, _ = hash_password(password, salt)
return password_hash == stored_hash
def validate_password_strength(password):
"""Validate password strength"""
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not any(c.isupper() for c in password):
return False, "Password must contain at least one uppercase letter"
if not any(c.islower() for c in password):
return False, "Password must contain at least one lowercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain at least one number"
return True, "Password is strong"
def register_user(username, email, password):
"""Register a new user"""
try:
# Validate password strength
is_strong, strength_message = validate_password_strength(password)
if not is_strong:
return False, strength_message
# Validate username and email format
if len(username) < 3:
return False, "Username must be at least 3 characters long"
if '@' not in email or '.' not in email:
return False, "Please enter a valid email address"
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# Check if username or email already exists
cursor.execute('SELECT id FROM users WHERE username = ? OR email = ?', (username, email))
if cursor.fetchone():
conn.close()
return False, "Username or email already exists"
# Hash password
password_hash, salt = hash_password(password)
# Insert new user
cursor.execute('''
INSERT INTO users (username, email, password_hash, salt)
VALUES (?, ?, ?, ?)
''', (username, email, password_hash, salt))
conn.commit()
conn.close()
return True, "User registered successfully"
except sqlite3.IntegrityError as e:
return False, "Username or email already exists"
except sqlite3.Error as e:
return False, f"Database error: {str(e)}"
except Exception as e:
return False, f"Registration failed: {str(e)}"
def authenticate_user(username, password):
"""Authenticate user login"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# Get user by username
cursor.execute('SELECT id, username, password_hash, salt, api_key FROM users WHERE username = ? AND is_active = 1', (username,))
user = cursor.fetchone()
if user and verify_password(password, user[2], user[3]):
# Update last login
cursor.execute('UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?', (user[0],))
conn.commit()
user_data = {
'id': user[0],
'username': user[1],
'api_key': user[4]
}
conn.close()
return True, user_data
else:
conn.close()
return False, "Invalid username or password"
except sqlite3.Error as e:
return False, f"Database error: {str(e)}"
except Exception as e:
return False, f"Authentication failed: {str(e)}"
def change_password(user_id, current_password, new_password):
"""Change user password"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# Get current user data
cursor.execute('SELECT password_hash, salt FROM users WHERE id = ?', (user_id,))
user = cursor.fetchone()
if not user:
conn.close()
return False, "User not found"
# Verify current password
if not verify_password(current_password, user[0], user[1]):
conn.close()
return False, "Current password is incorrect"
# Validate new password strength
is_strong, strength_message = validate_password_strength(new_password)
if not is_strong:
conn.close()
return False, strength_message
# Hash new password
new_password_hash, new_salt = hash_password(new_password)
# Update password
cursor.execute('UPDATE users SET password_hash = ?, salt = ? WHERE id = ?',
(new_password_hash, new_salt, user_id))
conn.commit()
conn.close()
return True, "Password changed successfully"
except Exception as e:
return False, f"Failed to change password: {str(e)}"
def update_user_api_key(user_id, api_key):
"""Update user's API key"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
cursor.execute('UPDATE users SET api_key = ? WHERE id = ?', (api_key, user_id))
conn.commit()
conn.close()
return True, "API key updated successfully"
except Exception as e:
return False, f"Failed to update API key: {str(e)}"
def deactivate_user_account(user_id):
"""Deactivate user account"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
cursor.execute('UPDATE users SET is_active = 0 WHERE id = ?', (user_id,))
conn.commit()
conn.close()
return True, "Account deactivated successfully"
except Exception as e:
return False, f"Failed to deactivate account: {str(e)}"
def get_user_profile(user_id):
"""Get user profile information"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
cursor.execute('''
SELECT username, email, created_at, last_login, api_key
FROM users WHERE id = ? AND is_active = 1
''', (user_id,))
user = cursor.fetchone()
conn.close()
if user:
return {
'username': user[0],
'email': user[1],
'created_at': user[2],
'last_login': user[3],
'has_api_key': bool(user[4])
}
return None
except Exception as e:
return None
def get_user_by_email(email):
"""Get user by email address"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
cursor.execute('SELECT id, username FROM users WHERE email = ? AND is_active = 1', (email,))
user = cursor.fetchone()
conn.close()
if user:
return {'id': user[0], 'username': user[1]}
return None
except Exception as e:
return None
def reset_password_request(email):
"""Request password reset (placeholder for future email functionality)"""
user = get_user_by_email(email)
if user:
# In a real application, this would send an email with a reset link
# For now, we'll just return success
return True, f"Password reset instructions sent to {email}"
else:
return False, "Email address not found"
def get_user_api_key(user_id):
"""Get user's API key"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
cursor.execute('SELECT api_key FROM users WHERE id = ?', (user_id,))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
except Exception as e:
return None
def create_default_admin():
"""Create a default admin user if no users exist"""
try:
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# Check if any users exist
cursor.execute('SELECT COUNT(*) FROM users')
user_count = cursor.fetchone()[0]
if user_count == 0:
# Create default admin user with strong password
admin_username = "admin"
admin_email = "[email protected]"
admin_password = "Admin123!" # Strong password
password_hash, salt = hash_password(admin_password)
cursor.execute('''
INSERT INTO users (username, email, password_hash, salt)
VALUES (?, ?, ?, ?)
''', (admin_username, admin_email, password_hash, salt))
conn.commit()
print("Default admin user created: username='admin', password='Admin123!'")
print("IMPORTANT: Change this password after first login!")
conn.close()
except Exception as e:
print(f"Error creating default admin: {e}")
# Initialize database and create default admin user
if __name__ == "__main__":
# Only initialize once when the script is run directly
init_database()
create_default_admin()
else:
# When imported as a module, just initialize database
init_database()
def validate_api_key(api_key):
"""Validate the API key format and test it"""
if not api_key:
return False, "API key cannot be empty"
# Check if it starts with 'sk-' and has appropriate length
if not api_key.startswith('sk-') or len(api_key) < 20:
return False, "Invalid API key format. OpenAI API keys start with 'sk-' and are at least 20 characters long."
try:
# Test the API key with a simple request
client = OpenAI(api_key=api_key)
client.models.list() # This will fail if the API key is invalid
return True, "API key is valid"
except Exception as e:
return False, f"Invalid API key: {str(e)}"
def get_api_key_for_use():
"""Get the API key to use for API calls"""
# Always prioritize the current user's stored API key
if st.session_state.authenticated and st.session_state.current_user:
user_api_key = get_user_api_key(st.session_state.current_user['id'])
if user_api_key:
return user_api_key
# If no user API key, return None (don't fall back to env)
return None
def set_api_key_for_request(api_key):
"""Set API key for a single request and return a cleanup function"""
if api_key:
os.environ["OPENAI_API_KEY"] = api_key
return lambda: os.environ.pop("OPENAI_API_KEY", None)
return lambda: None
# Initialize API key in session state (but don't use env var as default)
if "api_key" not in st.session_state:
st.session_state.api_key = ""
def update_api_key(new_api_key, user_id=None):
"""Update the API key in session state and database after validation"""
is_valid, message = validate_api_key(new_api_key)
if is_valid:
# Update user's API key in database if user_id is provided
if user_id:
success, db_message = update_user_api_key(user_id, new_api_key)
if not success:
return False, f"API key validated but failed to save: {db_message}"
# Update session state
st.session_state.api_key = new_api_key
return True, message
return False, message
def get_current_user_api_key():
"""Get the current user's API key from database"""
if st.session_state.authenticated and st.session_state.current_user:
return get_user_api_key(st.session_state.current_user['id'])
return None
def check_api_key_validity(api_key):
"""Check if the stored API key is still valid"""
if not api_key:
return False, "No API key provided"
try:
client = OpenAI(api_key=api_key)
client.models.list()
return True, "API key is valid"
except Exception as e:
return False, f"API key validation failed: {str(e)}"
def get_valid_api_key():
"""Get a valid API key for the current user"""
user_api_key = get_current_user_api_key()
if user_api_key:
is_valid, message = check_api_key_validity(user_api_key)
if is_valid:
return user_api_key
else:
# API key is invalid, remove it from database
if st.session_state.current_user:
update_user_api_key(st.session_state.current_user['id'], None)
return None
return None
def login_user(username, password):
"""Authenticate and login user"""
try:
success, result = authenticate_user(username, password)
if success:
st.session_state.authenticated = True
st.session_state.current_user = result
st.session_state.show_login = False
st.session_state.show_register = False
st.session_state.login_time = datetime.now()
# Set user's API key if available
if result and 'api_key' in result and result['api_key']:
st.session_state.api_key = result['api_key']
return True, "Login successful"
else:
return False, result
except Exception as e:
return False, f"Login error: {str(e)}"
def check_session_timeout():
"""Check if the current session has timed out"""
if not st.session_state.authenticated or not st.session_state.login_time:
return False
current_time = datetime.now()
time_diff = current_time - st.session_state.login_time
timeout_seconds = st.session_state.session_timeout
if time_diff.total_seconds() > timeout_seconds:
logout_user()
return True
return False
def logout_user():
"""Logout current user"""
st.session_state.authenticated = False
st.session_state.current_user = None
st.session_state.show_login = True
st.session_state.show_register = False
st.session_state.api_key = ""
st.session_state.messages = []
st.session_state.session_id = "default_session"
def switch_to_register():
"""Switch to registration form"""
st.session_state.show_login = False
st.session_state.show_register = True
def switch_to_login():
"""Switch to login form"""
st.session_state.show_login = True
st.session_state.show_register = False
def show_auth_forms():
"""Display authentication forms (login/register)"""
st.markdown("""
<div style="display: flex; justify-content: center; width: 100%; margin: 0 auto;">
<div style="display: inline-block; text-align: center; padding: 4px 20px;
background: linear-gradient(135deg, #4f46e5, #3b82f6);
border-radius: 6px; margin: 4px auto;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.15);
border: 1px solid rgba(255, 255, 255, 0.1);
backdrop-filter: blur(10px);
max-width: fit-content;">
<div style="display: flex; flex-direction: column; gap: 2px;">
<h1 style="color: white; font-size: 18px; font-weight: 600; margin: 0; padding: 0;
font-family: 'Arial', sans-serif; line-height: 1.2;
text-shadow: 0 1px 2px rgba(0, 0, 0, 0.1);">
Academic Advisement Bot
</h1>
<p style="color: rgba(255, 255, 255, 0.95); font-size: 11px; font-weight: 400;
margin: 0; padding: 0; line-height: 1.2;
letter-spacing: 0.3px;">
Please login or register to continue
</p>
</div>
</div>
</div>
""", unsafe_allow_html=True)
# Create two columns for login and register forms
col1, col2 = st.columns(2)
with col1:
st.subheader("Login")
with st.form("login_form"):
login_username = st.text_input("Username", key="login_username")
login_password = st.text_input("Password", type="password", key="login_password")
login_submitted = st.form_submit_button("Login")
if login_submitted:
if login_username and login_password:
success, message = login_user(login_username, login_password)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
else:
st.error("Please fill in all fields")
if st.button("Don't have an account? Register"):
switch_to_register()
st.markdown(
"""
<div style="margin-top: 20px; text-align: left;">
<a href="https://huggingface.co/spaces/sahsan/cet_advisement_bot/tree/main" target="_blank"
style="display: inline-block; padding: 10px 20px; background-color: #4CAF50; color: white;
text-align: center; text-decoration: none; border-radius: 5px; font-size: 16px;">
View Project Files
</a>
</div>
""",
unsafe_allow_html=True
)
with col2:
st.subheader("Register")
with st.form("register_form"):
reg_username = st.text_input("Username", key="reg_username")
reg_email = st.text_input("Email", key="reg_email")
reg_password = st.text_input("Password", type="password", key="reg_password")
reg_confirm_password = st.text_input("Confirm Password", type="password", key="reg_confirm_password")
register_submitted = st.form_submit_button("Register")
if register_submitted:
if reg_username and reg_email and reg_password and reg_confirm_password:
if reg_password == reg_confirm_password:
success, message = register_user(reg_username, reg_email, reg_password)
if success:
st.success(message)
st.info("Please login with your new account")
switch_to_login()
st.rerun()
else:
st.error(message)
else:
st.error("Passwords do not match")
else:
st.error("Please fill in all fields")
if st.button("Already have an account? Login"):
switch_to_login()
# Original RAG chain setup
conversational_rag_chain = RunnableWithMessageHistory(
rag_chain,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
# List of predefined Q&A pairs for evaluation
questions_answers = [
# Easy
("Which courses can I take first semester?",
"You can take CET 1100, CET 1111, CET 1120, CET 1150, ENG 1101, MAT 1275."),
("Can I take CET1100 after completing ENG1101?",
"Yes, you can take CET 1100 after completing ENG 1101 because CET 1100 requires no prerequisite."),
("What courses should I take after CET1111??",
"CET 1120 (1 credit), CET 1150 (3 credits), CET 1211 (2 credits), MAT 1375 (4 credits), ENG 1121 (3 credits)."),
("Can I overload credits this semester?",
" It's best to consult with your academic advisor or the registrar's office at your institution to understand the specific requirements and process for requesting a credit overload."),
("I just completed CET1111 and MAT1275. What courses should I register for next semester?",
"Based on the courses you've completed (CET 1111 and MAT 1275), you can consider registering for the following courses next semester: CET 1120 (1 credit) if you haven't taken it yet, CET 1150 (3 credits), CET 1211 (2 credits), MAT 1375 (4 credits), ENG 1101 (3 credits)."),
("Can I take CET2305 if I haven’t finished CET1150 yet?",
"No you can not take CET2350 before taing CET1150 because CET1150 is prerequisite of CET1250, and CET1250 is prerequisite of CET2350. Since you havent completed CET1150, I assume you have not finished CET2350 as well, therefore you cant not take CET2350."),
("I registered late and some CET classes are full. What can I do?",
"Talk to your advisor."),
("I failed CET2450. Can I still register for upper-level CET classes?",
"CET2450 is not prerequired for any higher level CET classes so even if you failed this class you can still take upper level CET classes."),
# Medium
("The courses of the CET department have been changed to EMT to CET. I have failed in EMT1255, which class I should take as an equivalent class to EMT 1255.",
"If you failed EMT 1255, you should take CET 2350, as EMT 1255 is equivalent to CET 2350."),
("I have completed CET1111, CET1150, and ENG1101. What courses can I take next?",
"You can take CET 1100, CET 1120, MAT 1275, CET 1211, CET 1250 in the next semester."),
("Can you list all prerequisites and corequisites for CET 3615?",
"Prerequisites of CET 3615 are MAT 1575, CET 3525, PHY 1434 or PHY 1442."),
("Which general education courses are required for graduation?",
"""For degree in Computer Engineering Technology (CET), the General Education requirements typically include courses in English, Mathematics, and Flex Core electives. Here are the general education courses you need to complete:
ENG 1101 (3 credits), ENG 1121 (3 credits), MAT 1275 (4 credits), Flex Core 1 (3 credits), Flex Core 2 (3 credits), Flex core 2, Flex core 4, MAT 1375 (4 credits), MAT 1475 (4 credits), PHY 1433(4 credits), MAT 2680, MAT 2580, ID Course."""
),
("How many credits can I take if I want to overload?",
"Generally, students are allowed to take a standard full-time course load, which is often around 12-18 credits per semester. To find out the specific number of credits you can take when overloading, and the process to request an overload, you should Consult with your Academic Advisor."),
("Can I take an internship while I'm still completing my last two CET courses?",
"Yes you can take an internship even if you havent completed last two CET courses since the internship lasses has no prerequisite."),
("What are the General Education requirements for my AAS degree in CET?",
"For an AAS degree in Computer Engineering Technology (CET), the general education requirements are ENG 1101 (3 credits) - English Composition 1, ENG 1121 (3 credits), MAT 1275 (4 credits), MAT 1375 (4 credits) - The next math course after MAT 1275, PHY 1433 (4 credits), Flex Core 1 (3 credits), Flex Core 2 (3 credits), ID Course."),
("I transferred from another college and completed Calculus I. Do I need to retake it here?",
"Calculus I is equivalent to MAT 1475 at your current institute. If you have completed Calculus I at another college, you do not need to retake it."),
("I'm interested in switching from AAS to BTech after finishing my AAS. What are the requirements?",
"Talk to your advisor."),
("I need help choosing between CET3525 and CET3625 next semester. What should I consider?",
"You can not take CET3625 before taking CET3525, so first cosider taking CET3525 then in the next semester take CET3625."),
# Hard
("Given my completed courses (CET1111, CET1150, ENG1101), provide a custom-made step-by-step plan for the remaining semesters.",
"""Based on the courses you've completed (CET 1111, CET 1150, ENG 1101), here's a step-by-step plan for the remaining semesters:
You've already completed first semester.
Second Semester: CET 1120 (2 credits) - No prerequisites.
CET 1100 (2 credits) - No prerequisites.
MAT 1275 (4 credits), ENG 1121 (3 credits), CET 1250(3 credits), CET 1211
Third Semester: CET 2312 (4 credits) - Prerequisite: CET 1120; Corequisite: PHY 1433.
CET 2350 (4 credits) - Prerequisite/Corequisite: CET 1250 and MAT 1375.
CET 2370 (2 credits) - Prerequisite: CET 1250.
CET 2390 (1 credit) - Prerequisite/Corequisite: CET 2370.
PHY 1433 (4 credits) - Prerequisite: MAT 1275.
MAT 1375(4 credits)
Fourth Semester: CET 2450 (3 credits) - Prerequisite: CET 2350.
CET 2455 (2 credits) - Prerequisite: CET 2370.
CET 2461 (2 credits) - Prerequisite/Corequisite: CET 2455, MAT 1475.
Technical elective, MAT 1475 (4 credits), Flex core 1 (3 credits)
Fifth Semester: CET 3510 (4 credits) - Prerequisite/Corequisite: CET 2411 and MAT 1575.
CET 3525 (4 credits) - Prerequisite/Corequisite: MAT 1575.
MAT 1575 (4 credits) - Prerequisite: MAT 1475.
PHY 1434 (3 credits), Flex core 2
Sixth Semester: MAT 2680 (3 credits) - Prerequisite: MAT 1575.
CET 3615 (4 credits) - Prerequisites: MAT 1575, CET 3525, and PHYS 1434 or PHYS 1442.
CET 3625 (1 credit) - Prerequisite: CET 3525; Corequisite: MAT 2680.
CET 3640 (3 credits) - Prerequisites: CET 2411 and CET 3510.
FLEX CORE 3, COM 1330 (3 credits)
Seventh Semester: CET 4711 (2 credits) - Prerequisite: CET 3640 and CET 4705.
MAT 2580 (3 credits) - Prerequisite/Corequisite: MAT 1575.
CET 4705 (2 credits) - Prerequisite: CET 3625 with a grade of C or better.
CET 4773 (4 credits) - Prerequisite: CET 3510.
Technical Elective 1 (4 credits), FLEX CORE 4
Eighth Semester: Technical Elective 2 (3 credits), CET 4811 (2 credits) - Prerequisites: CET 3640, CET 4711.
CET 4805 (2 credits) - Prerequisite: CET 4705.
CET 4864 (4 credits) - Prerequisites: CET 3625, MAT 2580.
ID Course."""
),
("I want to know the courses I can take in the 2nd semester. I've completed CET1120, CET 1150 but I haven't completed all the first-semester courses yet. Can you recommend some courses?",
"For the 2nd semester, you can take CET 1100, CET 1111, MAT 1275, CET 1250, ENG 1100, Flex Core 1."),
("If I want to graduate in six/seven semesters instead of eight, how should I plan my courses?",
"""1st semester: CET 1100, CET 1111, CET 1120, CET 1150, ENG 1100, MAT 1275
2nd semester: CET 1211, CET 1250, CET 2312, CET 2350, MAT 1375, PHY 1433
3rd semester: MAT 1475, PHY 1434, CET 2370, CET 2390, CET 2450, Technical Elective
4th semester: CET 2455, CET 2461, CET 3510, CET 3525, Flex Core 2, MAT 1575
5th semester: CET 3615, CET 3625, MAT 2680, CET 3640, Flex Core 3, CET 4773
6th semester: CET 4705, CET 4711, Technical Elective 1, Flex Core 4, ID, MAT 2580, COM 1330
7th semester: CET 4805, CET 4811, CET 4864, Technical Elective 2, Flex Core 1, ENG 1121"""
),
# Long Answer
("My catalog year is 2023, but I took a break. Should I follow the new 2025 curriculum now?",
"Yes you have to follow 2025 curriculum."),
("List all courses till the eighth semester.",
"""1st semester: CET 1100, CET 1111, CET 1120, CET 1150, ENG 1100, MAT 1275
2nd semester: CET 1211, MAT 1375, CET 1250, ENG 1121, PHY 1433
3rd semester: CET 2312, MAT 1475, PHY 1434, CET 2350, CET 2370, CET 2390
4th semester: CET 2450, CET 2455, CET 2461, Technical Elective, MAT 1575
5th semester: Flex Core 1, CET 3510, CET 3525, MAT 2680, ID
6th semester: Flex Core 2, CET 3615, CET 3625, CET 3640, Technical Elective 1
7th semester: CET 4705, CET 4711, CET 4773, Flex Core 3, Technical Elective 2
8th semester: CET 4811, CET 4864, CET 4805, COM 1330, Flex Core 4"""
)
]
def calculate_cosine_similarity(text1, text2):
"""Calculate cosine similarity between two text strings"""
if isinstance(text1, list):
text1 = " ".join(text1)
if isinstance(text2, list):
text2 = " ".join(text2)
# Create vectorizer and transform texts
vectorizer = CountVectorizer().fit_transform([str(text1), str(text2)]) # Ensure inputs are strings
vectors = vectorizer.toarray()
# Calculate cosine similarity
cosine_sim = cosine_similarity(vectors)[0, 1]
return cosine_sim
def init_session_state():
"""Initialize session state variables"""
if "messages" not in st.session_state:
st.session_state.messages = []
if "session_id" not in st.session_state:
st.session_state.session_id = "default_session"
if "last_input" not in st.session_state:
st.session_state.last_input = None
if "evaluation_results" not in st.session_state:
st.session_state.evaluation_results = []
if "evaluation_complete" not in st.session_state:
st.session_state.evaluation_complete = False
if "current_question_index" not in st.session_state:
st.session_state.current_question_index = 0
if "in_evaluation_mode" not in st.session_state:
st.session_state.in_evaluation_mode = False
if "api_key" not in st.session_state:
st.session_state.api_key = ""
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if "current_user" not in st.session_state:
st.session_state.current_user = None
if "show_login" not in st.session_state:
st.session_state.show_login = True
if "show_register" not in st.session_state:
st.session_state.show_register = False
if "login_time" not in st.session_state:
st.session_state.login_time = None
if "session_timeout" not in st.session_state:
st.session_state.session_timeout = 3600 # 1 hour in seconds
def format_message(text, is_user=False):
"""Format chat messages with styling"""
if is_user:
align = "right"
bg_color = "linear-gradient(135deg, #6366f1, #4f46e5)"
border_radius = "20px 20px 5px 20px"
else:
align = "left"
bg_color = "linear-gradient(135deg, #1e1e38, #242447)"
border_radius = "20px 20px 20px 5px"
return f"""
<div style="display: flex; justify-content: {align}; margin: 15px 0;">
<div style="background: {bg_color};
padding: 15px 20px;
border-radius: {border_radius};
max-width: 80%;
font-size: 16px;
line-height: 1.6;
color: white;
box-shadow: 0 4px 15px rgba(0, 0, 0, 0.1);
animation: fadeIn 0.5s ease-out;
border: 1px solid rgba(255, 255, 255, 0.1);">
{text}
</div>
</div>
"""
def custom_css():
"""Define custom CSS for the app"""
return """
<style>
@keyframes fadeIn {
from { opacity: 0; transform: translateY(10px); }
to { opacity: 1; transform: translateY(0); }
}
.stTextInput > div > div > input {
background-color: #f8f9fa;
border: 2px solid #e9ecef;
border-radius: 15px;
padding: 15px 20px;
font-size: 16px;
transition: all 0.3s ease;
color: black; /* Ensure input text is black */
}
.stButton > button {
background: linear-gradient(135deg, #6366f1, #4f46e5);
color: white;
border: none;
border-radius: 15px;
padding: 15px 30px;
font-weight: 600;
transition: all 0.3s ease;
box-shadow: 0 4px 10px rgba(79, 70, 229, 0.2);
}
.stButton > button:hover {
transform: translateY(-2px);
box-shadow: 0 6px 15px rgba(79, 70, 229, 0.3);
}
.main {
background: linear-gradient(135deg, #f8fafc, #eef2ff);
}
</style>
"""
def handle_submit():
"""Handle user input submission"""
if st.session_state.user_input and len(st.session_state.user_input.strip()) > 0:
user_message = st.session_state.user_input.strip()
# Prevent duplicate messages
if st.session_state.last_input != user_message:
st.session_state.last_input = user_message
# Add user message to chat
st.session_state.messages.append({
"role": "user",
"content": user_message
})
# Get response from the chain
try:
# Get the user's API key for this request
user_api_key = get_api_key_for_use()
if not user_api_key:
result = "Error: No API key set. Please set your OpenAI API key in the User Panel."
else:
# Set the API key for this request and get cleanup function
cleanup = set_api_key_for_request(user_api_key)
try:
result = conversational_rag_chain.invoke(
{"input": user_message},
config={"configurable": {"session_id": st.session_state.session_id}}
)["answer"]
finally:
# Always cleanup the environment variable
cleanup()
except Exception as e:
result = f"Error communicating with the model: {str(e)}. Please check your API key and network."
st.error(result)
# Add assistant response to chat
st.session_state.messages.append({
"role": "assistant",
"content": result
})
# Reset input box
st.session_state.user_input = ""
def run_automated_evaluation():
"""Run the automated evaluation process"""
st.session_state.in_evaluation_mode = True
st.session_state.evaluation_results = []
st.session_state.messages = [] # Clear messages for evaluation display
# Check if user has API key set
user_api_key = get_api_key_for_use()
if not user_api_key:
st.error("OpenAI API Key is not set. Please set it in the User Panel.")
st.session_state.in_evaluation_mode = False # Exit evaluation mode
return
progress_bar = st.progress(0)
status_text = st.empty()
for i, (question, expected_answer) in enumerate(questions_answers):
progress = (i) / len(questions_answers)
progress_bar.progress(progress)
status_text.text(f"Processing question {i+1}/{len(questions_answers)}")
st.session_state.session_id = f"eval_session_{i}" # Unique session for each question
actual_answer = "Error: Could not get response." # Default error message
try:
# Get the user's API key for this request
user_api_key = get_api_key_for_use()
if not user_api_key:
actual_answer = "Error: No API key set. Please set your OpenAI API key in the User Panel."
else:
# Set the API key for this request and get cleanup function
cleanup = set_api_key_for_request(user_api_key)
try:
actual_answer = conversational_rag_chain.invoke(
{"input": question},
config={"configurable": {"session_id": st.session_state.session_id}}
)["answer"]
finally:
# Always cleanup the environment variable
cleanup()
except Exception as e:
actual_answer = f"Error invoking RAG chain: {str(e)}"
st.warning(f"Error processing question '{question[:50]}...': {str(e)}")
similarity = calculate_cosine_similarity(str(actual_answer), str(expected_answer))
st.session_state.evaluation_results.append({
"question": question,
"expected_answer": str(expected_answer), # Ensure it's a string
"actual_answer": str(actual_answer), # Ensure it's a string
"similarity": similarity
})
time.sleep(0.5) # Small delay
progress_bar.progress(1.0)
status_text.text("Evaluation completed!")
save_results_to_csv()
st.session_state.evaluation_complete = True
def save_results_to_csv():
"""Save evaluation results to a CSV file"""
if st.session_state.evaluation_results:
df = pd.DataFrame(st.session_state.evaluation_results)
df.to_csv("qa_evaluation_results.csv", index=False)
return df
return pd.DataFrame() # Return empty DataFrame if no results
def display_evaluation_results(location="main"):
"""Display the evaluation results in the app"""
if not st.session_state.evaluation_results:
if location == "main": # Only show this prominent message in the main area
st.info("No evaluation results to display. Run the evaluation from the Admin Panel.")
return
df = pd.DataFrame(st.session_state.evaluation_results)
avg_similarity = df['similarity'].mean() if not df.empty else 0
st.metric("Average Cosine Similarity", f"{avg_similarity:.4f}")
st.write("Detailed Results:")
tab1, tab2 = st.tabs(["Detailed View", "Summary Chart"])
with tab1:
for idx, row in df.iterrows():
with st.expander(f"Question {idx + 1}: {row['question'][:100]}..."):
st.write("**Question:**")
st.write(row['question'])
st.write("**LLM Answer:**")
st.write(row['actual_answer'])
st.write("**Expected Answer:**")
st.write(row['expected_answer'])
st.write("**Cosine Similarity:**")
st.write(f"{row['similarity']:.4f}")
if row['similarity'] >= 0.8:
st.success(f"High similarity: {row['similarity']:.4f}")
elif row['similarity'] >= 0.6:
st.warning(f"Medium similarity: {row['similarity']:.4f}")
else:
st.error(f"Low similarity: {row['similarity']:.4f}")
with tab2:
st.write("Similarity Scores Chart")
if not df.empty:
chart_data = pd.DataFrame({
'Question': [f"Q{i+1}" for i in range(len(df))],
'Similarity': df['similarity']
}).set_index('Question')
st.bar_chart(chart_data)
else:
st.write("No data for chart.")
if not df.empty:
csv = df.to_csv(index=False)
st.download_button(
label="Download Results CSV",
data=csv,
file_name="qa_evaluation_results.csv",
mime="text/csv",
key=f"download_results_csv_{location}"
)
def main():
st.set_page_config(page_title="Academic Advisement Bot", layout="wide")
# Initialize database first
if not init_database():
st.error("Failed to initialize database. Please check the console for errors.")
st.stop()
init_session_state()
st.markdown(custom_css(), unsafe_allow_html=True)
# Check authentication status
if not st.session_state.authenticated:
show_auth_forms()
return
# Check session timeout
if check_session_timeout():
st.warning("Session expired. Please login again.")
st.rerun()
# Additional safety check for current_user
if not st.session_state.current_user:
st.error("User session data is missing. Please login again.")
logout_user()
st.rerun()
# User is authenticated, show main app
welcome_message = f"Welcome, {st.session_state.current_user['username']}! Ask questions about your course material!"
st.markdown(f"""
<div style="display: flex; justify-content: center; width: 100%; margin: 0 auto;">
<div style="display: inline-block; text-align: center; padding: 4px 20px;
background: linear-gradient(135deg, #4f46e5, #3b82f6);
border-radius: 6px; margin: 4px auto;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.15);
border: 1px solid rgba(255, 255, 255, 0.1);
backdrop-filter: blur(10px);
max-width: fit-content;">
<div style="display: flex; flex-direction: column; gap: 2px;">
<h1 style="color: white; font-size: 18px; font-weight: 600; margin: 0; padding: 0;
font-family: 'Arial', sans-serif; line-height: 1.2;
text-shadow: 0 1px 2px rgba(0, 0, 0, 0.1);">
Academic Advisement Bot
</h1>
<p style="color: rgba(255, 255, 255, 0.95); font-size: 11px; font-weight: 400;
margin: 0; padding: 0; line-height: 1.2;
letter-spacing: 0.3px;">
{welcome_message}
</p>
</div>
</div>
</div>
<style>
div[data-testid="stVerticalBlock"] > div:first-child {{
text-align: center;
}}
</style>
""", unsafe_allow_html=True)
with st.sidebar:
st.header("User Panel")
# Additional safety check for current_user in sidebar
if not st.session_state.current_user:
st.error("User session data is missing. Please login again.")
if st.button("Return to Login"):
logout_user()
st.rerun()
return
# User info and logout
st.subheader(f"Welcome, {st.session_state.current_user['username']}!")
# User profile information
try:
user_profile = get_user_profile(st.session_state.current_user['id'])
if user_profile:
with st.expander("Profile Information"):
st.write(f"**Username:** {user_profile['username']}")
st.write(f"**Email:** {user_profile['email']}")
st.write(f"**Member since:** {user_profile['created_at']}")
if user_profile['last_login']:
st.write(f"**Last login:** {user_profile['last_login']}")
st.write(f"**API Key:** {'Set' if user_profile['has_api_key'] else 'Not set'}")
except Exception as e:
st.error(f"Error loading profile: {str(e)}")
if st.button("Logout", type="secondary"):
logout_user()
st.rerun()
# Account management section
with st.expander("Account Management"):
# Password change
st.subheader("Change Password")
with st.form("change_password_form"):
current_password = st.text_input("Current Password", type="password", key="current_password")
new_password = st.text_input("New Password", type="password", key="new_password")
confirm_new_password = st.text_input("Confirm New Password", type="password", key="confirm_new_password")
change_submitted = st.form_submit_button("Change Password")
if change_submitted:
if current_password and new_password and confirm_new_password:
if new_password == confirm_new_password:
success, message = change_password(st.session_state.current_user['id'], current_password, new_password)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
else:
st.error("New passwords do not match")
else:
st.error("Please fill in all fields")
st.divider()
# Account deactivation
st.subheader("Deactivate Account")
st.warning("This action cannot be undone. You will need to contact support to reactivate your account.")
if st.button("Deactivate Account", type="secondary"):
success, message = deactivate_user_account(st.session_state.current_user['id'])
if success:
st.success(message)
st.info("You will be logged out in 5 seconds...")
time.sleep(5)
logout_user()
st.rerun()
else:
st.error(message)
st.subheader("API Key Management")
try:
# Get the current user's API key from database
current_user_api_key = get_api_key_for_use()
if current_user_api_key:
masked_api_key = current_user_api_key[:4] + "..." + current_user_api_key[-4:] if len(current_user_api_key) > 8 else "Invalid"
st.text(f"Current API Key: {masked_api_key}")
st.success("βœ… API key is set and ready to use")
else:
st.text("Current API Key: Not set")
st.warning("⚠️ Please set your OpenAI API key to use the chatbot")
st.divider()
new_api_key_input = st.text_input("Set/Update API Key", type="password", key="new_api_key_input_field")
col1, col2 = st.columns(2)
with col1:
if st.button("Update API Key"):
if new_api_key_input:
success, message = update_api_key(new_api_key_input, st.session_state.current_user['id'])
if success:
st.success(message)
st.rerun()
else:
st.error(message)
else:
st.warning("Please enter a new API key.")
with col2:
if st.button("Test Current API Key"):
if current_user_api_key:
is_valid, message = check_api_key_validity(current_user_api_key)
if is_valid:
st.success("βœ… API key is valid and working!")
else:
st.error(f"❌ API key validation failed: {message}")
st.warning("Please update your API key")
else:
st.warning("No API key to test")
# Add option to clear API key
if current_user_api_key:
if st.button("Clear API Key", type="secondary"):
success, message = update_user_api_key(st.session_state.current_user['id'], None)
if success:
st.success("API key cleared successfully")
st.rerun()
else:
st.error(f"Failed to clear API key: {message}")
except Exception as e:
st.error(f"Error in API key management: {str(e)}")
st.divider()
try:
if st.button("Run Automated Evaluation"):
run_automated_evaluation()
except Exception as e:
st.error(f"Error running evaluation: {str(e)}")
try:
if st.session_state.evaluation_complete and st.session_state.in_evaluation_mode: # Show in sidebar only during/after eval
st.success("Evaluation completed! Results saved to qa_evaluation_results.csv")
display_evaluation_results(location="sidebar")
except Exception as e:
st.error(f"Error displaying evaluation results: {str(e)}")
st.divider()
st.markdown(
"""
<div style="margin-top: 20px; text-align: center;">
<a href="https://huggingface.co/spaces/sahsan/cet_advisement_bot/tree/main" target="_blank"
style="display: inline-block; padding: 10px 20px; background-color: #4CAF50; color: white;
text-align: center; text-decoration: none; border-radius: 5px; font-size: 16px;">
View Project Files
</a>
</div>
""",
unsafe_allow_html=True
)
if not st.session_state.in_evaluation_mode:
# Check if user has API key set
user_api_key = get_api_key_for_use()
if not user_api_key:
st.warning("⚠️ **No API Key Set** - Please set your OpenAI API key in the User Panel to start chatting!")
st.info("Go to the sidebar β†’ API Key Management β†’ Set your API key")
else:
st.success("βœ… **API Key Ready** - You can now ask questions!")
chat_container = st.container()
with chat_container:
for message in st.session_state.messages:
st.markdown(
format_message(
message["content"],
message["role"] == "user"
),
unsafe_allow_html=True
)
st.markdown("<div style='position: fixed; bottom: 0; left: 0; right: 0; padding: 20px; background: white; box-shadow: 0 -4px 15px rgba(0, 0, 0, 0.1); z-index: 999;'>", unsafe_allow_html=True)
cols = st.columns([7, 1])
with cols[0]:
st.text_input(
"Message",
key="user_input",
placeholder="Ask about courses..." if user_api_key else "Set API key first...",
on_change=handle_submit if user_api_key else None,
label_visibility="collapsed",
disabled=not user_api_key
)
with cols[1]:
st.button("Send", on_click=handle_submit, use_container_width=True, disabled=not user_api_key)
st.markdown("</div>", unsafe_allow_html=True)
else: # Evaluation mode display in main area
if st.session_state.evaluation_complete:
st.subheader("Evaluation Results (Main View)")
display_evaluation_results(location="main_eval_results") # Unique key for download button
if st.button("Return to Chat Mode"):
st.session_state.in_evaluation_mode = False
st.session_state.messages = [] # Clear eval messages
st.rerun()
else:
st.info("Evaluation in progress... Please wait. Results will appear here and in the sidebar once complete.")
st.markdown("""
<div style="text-align: center; padding: 20px 0; color: #6b7280; font-size: 14px; padding-bottom: 70px;"> Built with ❀️ to help students succeed
</div>
""", unsafe_allow_html=True)
if __name__ == "__main__":
main()