hanzla's picture
Update app.py
f59b622 verified
# visualize_gaze_dynamic_audio_transcribe_gifs_app.py
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
import numpy as np
import os
import re # Regular expressions
from io import BytesIO # To handle bytes in memory
import zipfile # To handle ZIP file uploads
import time # For potential delays/status updates
from typing import List, Dict, Tuple, Any, Optional, Set
# --- Find Gaze Segments ---
def get_gaze_segments(df):
peak_segments, valley_segments = [], []
if df is None or df.empty or not all(c in df.columns for c in ['is_peak', 'is_valley', 'log_time_float']): return [], []
try:
# Ensure boolean conversion handles strings 'True'/'False' and actual booleans
df['is_peak'] = df['is_peak'].apply(lambda x: str(x).strip().lower() == 'true' if not pd.isna(x) else False)
df['is_valley'] = df['is_valley'].apply(lambda x: str(x).strip().lower() == 'true' if not pd.isna(x) else False)
except Exception as e: st.error(f"Failed to process boolean columns for segments: {e}"); return [], []
peak_times = sorted(df.loc[df['is_peak'], 'log_time_float'].tolist())
valley_times = sorted(df.loc[df['is_valley'], 'log_time_float'].tolist())
for i in range(len(peak_times) - 1):
start_t, end_t = peak_times[i], peak_times[i+1]
if start_t < end_t: # Ensure valid segment
uid = seg_uid('peak', start_t, end_t)
peak_segments.append({
"index": i,
"start_time": start_t,
"end_time": end_t,
"seg_uid": uid,
"seg_type": "peak" # <<< ADDED HERE
})
for i in range(len(valley_times) - 1):
start_t, end_t = valley_times[i], valley_times[i+1]
if start_t < end_t: # Ensure valid segment
uid = seg_uid('valley', start_t, end_t)
valley_segments.append({
"index": i,
"start_time": start_t,
"end_time": end_t,
"seg_uid": uid,
"seg_type": "valley" # <<< ADDED HERE
})
return peak_segments, valley_segments
# --- MODIFIED: process_screenshot_zip ---
# Now returns a dict with results and status, NO st calls inside
@st.cache_data # Cache based on file content hash
def process_screenshot_zip_cached(uploaded_zip_bytes: bytes) -> Dict[str, Any]:
"""
Processes ZIP bytes containing screenshot GIFs.
Returns a dictionary with results and processing status.
NO Streamlit UI calls inside this function.
"""
gif_lookup = {}
if not uploaded_zip_bytes:
return {"success": False, "fatal_error": "No ZIP data provided.", "gif_lookup": None}
dir_pattern = re.compile(r".*gaze_(peak|valley)_segment_\d+_idx_\d+_to_\d+_time_([\d\.]+)_to_([\d\.]+)", re.IGNORECASE)
processed_count, skipped_count = 0, 0
error_messages = set()
warnings = [] # Collect warnings separately
try:
with zipfile.ZipFile(BytesIO(uploaded_zip_bytes), 'r') as zip_ref:
# Normalize path separators and check ending
gif_files = [f for f in zip_ref.namelist() if f.lower().replace('\\', '/').endswith('/animation.gif')]
if not gif_files:
warnings.append("No 'animation.gif' files found in expected subfolder structure within the ZIP.")
# Continue processing other things if any, but return empty lookup if truly none found
for gif_path in gif_files:
normalized_path = gif_path.replace('\\', '/')
path_parts = normalized_path.split('/')
if len(path_parts) >= 2:
folder_name = path_parts[-2] # Get the directory containing the gif
match = dir_pattern.match(folder_name)
if match:
seg_type = match.group(1).lower()
try:
# Format times to 2 decimal places for consistent keys
start_t_str = f"{float(match.group(2)):.2f}"
end_t_str = f"{float(match.group(3)):.2f}"
lookup_key = f"{seg_type}_{start_t_str}_{end_t_str}"
gif_bytes = zip_ref.read(gif_path)
if gif_bytes:
gif_lookup[lookup_key] = gif_bytes
processed_count += 1
else:
skipped_count += 1
error_messages.add(f"Empty GIF file skipped: '{gif_path}'")
except ValueError:
skipped_count += 1
error_messages.add(f"Could not parse time floats in folder name: '{folder_name}'")
except Exception as read_e:
skipped_count += 1
error_messages.add(f"Read error for '{gif_path}': {read_e}")
else:
# Log folders that didn't match the pattern if needed for debugging
# error_messages.add(f"Folder name format mismatch: '{folder_name}'")
skipped_count += 1 # Count as skipped if folder name doesn't match expected format
else:
skipped_count += 1 # Count as skipped if path structure is unexpected
error_messages.add(f"Unexpected path structure: '{gif_path}'")
# Return results dictionary
return {
"success": True,
"gif_lookup": gif_lookup,
"processed_count": processed_count,
"skipped_count": skipped_count,
"warnings": warnings,
"error_messages": list(error_messages), # Convert set for potential display
"fatal_error": None
}
except zipfile.BadZipFile:
# st.error("Invalid ZIP file.") # REMOVED
return {"success": False, "fatal_error": "Invalid or corrupted ZIP file.", "gif_lookup": None}
except Exception as e:
# st.error(f"Error processing ZIP: {e}") # REMOVED
return {"success": False, "fatal_error": f"Error processing ZIP: {e}", "gif_lookup": None}
# --- Cloud Service Integrations ---
# ... (rest of your imports and client initializations remain the same) ...
try:
from openai import OpenAI, AuthenticationError, APIError
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
# st.sidebar.warning("OpenAI library not found. `pip install openai`. Transcription/Chat disabled.", icon="⚠️") # Keep this sidebar one
try:
from pydub import AudioSegment
from pydub.exceptions import CouldntDecodeError
PYDUB_AVAILABLE = True
except ImportError:
PYDUB_AVAILABLE = False
st.error("Pydub library not found. `pip install pydub`. Audio processing disabled.", icon="🚨") # Keep this main area one
# stop execution early if pydub is critical and missing
# st.stop()
try:
from supabase import create_client, Client as SupabaseClient
from supabase.lib.client_options import ClientOptions
from postgrest.exceptions import APIError as SupabaseAPIError # More specific Supabase errors
SUPABASE_AVAILABLE = True
except ImportError:
SUPABASE_AVAILABLE = False
st.sidebar.error("Supabase library not found. `pip install supabase`. Data persistence disabled.", icon="🚨") # Keep this sidebar one
try: # Assuming Pinecone is optional or might fail
from pinecone import Pinecone
PINECONE_AVAILABLE=True
except ImportError:
PINECONE_AVAILABLE=False
st.sidebar.error("Pinecone library not found. `pip install pinecone-client`. Vector search disabled.", icon="🚨") # Keep sidebar one
# --- App Setup ---
st.set_page_config(layout="wide")
st.title("Gaze, Audio, Transcription & GIF Visualizer")
# --- Session-state reset helpers (moved up) ----------------------------------
def reset_all_processing_state():
"""Resets state related to processed data when inputs change."""
st.session_state.gaze_segments = {'peak': [], 'valley': []}
st.session_state.transcripts = {}
st.session_state.embedded_ids = set()
st.session_state.active_segment_key = None
st.session_state.peak_selector_key = None
st.session_state.valley_selector_key= None
st.session_state.chat_history = []
st.session_state.processed_gaze_file_id = None
st.session_state.processed_audio_file_id = None
st.session_state.processed_zip_file_id = None
def reset_audio_dependent_state():
"""Resets everything that depends on the full-audio object."""
st.session_state.transcripts = {}
st.session_state.embedded_ids = set()
st.session_state.chat_history = []
st.session_state.processed_audio_file_id = None
# --- Constants ---
EMBED_MODEL = "text-embedding-3-small"
K_NEIGHBORS = 4
PINECONE_INDEX_NAME = "transcripts" # Pinecone index name
# --- Initialize Cloud Clients (Safely) ---
supabase: Optional[SupabaseClient] = None
pc: Optional[Pinecone] = None
idx: Optional[Any] = None # Pinecone Index object
openai_client: Optional[OpenAI] = None
# Supabase Client Initialization
if SUPABASE_AVAILABLE:
try:
SUPA_URL = st.secrets["SUPABASE_URL"]
SUPA_KEY = st.secrets["SUPABASE_SERVICE_KEY"] # Use service role key for server-side operations
# Increase timeout for potentially long uploads/downloads
options = ClientOptions(postgrest_client_timeout=60, storage_client_timeout=60)
supabase = create_client(SUPA_URL, SUPA_KEY, options=options)
# Test connection (optional but recommended)
supabase.table("sessions").select("id", head=True).execute()
st.sidebar.caption("Supabase client OK.", unsafe_allow_html=True) # Use sidebar for status
except KeyError:
st.sidebar.error("Supabase URL/Key missing in Streamlit secrets.", icon="πŸ”’")
supabase = None
except Exception as e:
st.sidebar.error(f"Supabase connection failed: {e}", icon="❌")
supabase = None
else:
st.sidebar.error("Supabase library not available. Cannot connect.", icon="🚨")
# Pinecone Client Initialization
if PINECONE_AVAILABLE and supabase: # Only init if Supabase is also working (as they are linked)
try:
PINECONE_API_KEY = st.secrets["PINECONE_API_KEY"]
pc = Pinecone(api_key=PINECONE_API_KEY)
idx = pc.Index(PINECONE_INDEX_NAME)
idx.describe_index_stats() # Test connection to index
st.sidebar.caption(f"Pinecone client OK (Index: '{PINECONE_INDEX_NAME}').", unsafe_allow_html=True)
except KeyError:
st.sidebar.error("Pinecone API Key missing in Streamlit secrets.", icon="πŸ”’")
pc = idx = None
except Exception as e:
# Check if the index exists, provide helpful message
if pc and PINECONE_INDEX_NAME not in pc.list_indexes().names:
st.sidebar.error(f"Pinecone index '{PINECONE_INDEX_NAME}' not found.", icon="❌")
else:
st.sidebar.error(f"Pinecone connection failed: {e}", icon="❌")
pc = idx = None
else:
if not PINECONE_AVAILABLE:
st.sidebar.error("Pinecone library not available.", icon="🚨")
elif not supabase:
st.sidebar.error("Pinecone disabled because Supabase connection failed.", icon="⚠️")
# OpenAI Client Initialization (Moved to Sidebar section for user input)
# --- Session State Initialization ---
def init_session_state():
default_states = {
# Core Data
'gaze_df': None,
'full_audio': None, # Will store AudioSegment object
'gif_lookup': None, # Dict mapping key -> gif_bytes
# Session Management
'session_id': None, # Confirmed ID from Supabase
'session_mode': "Create new", # Default mode
'session_locked': False, # Is the session ID confirmed and UI locked?
'data_loaded': False, # Has data been loaded from Supabase for this session?
'raw_saved': False, # Have raw files been saved for this session?
# Processed Data / Metadata
'gaze_segments': {'peak': [], 'valley': []},
'transcripts': {}, # {seg_uid: transcript_text | "" (failed/empty) | None (pending)}
'embedded_ids': set(), # Set of seg_uids that have been successfully embedded in Pinecone
# UI State
'active_segment_key': None, # Tuple like ('peak', index) or ('valley', index)
'peak_selector_key': None, # Store key of selected peak item
'valley_selector_key': None, # Store key of selected valley item
# File Upload Tracking (to detect changes)
'processed_gaze_file_id': None,
'processed_audio_file_id': None,
'processed_zip_file_id': None,
# Misc
'audio_load_error': None,
'initialized': True,
'openai_client': None, # Store the initialized client here
'chat_history': [],
}
for key, default_value in default_states.items():
if key not in st.session_state:
st.session_state[key] = default_value
if 'initialized' not in st.session_state:
init_session_state()
# --- Supabase Helper Functions ---
def check_supabase_client():
if not supabase:
st.error("Supabase client not available. Cannot perform database operations.")
return False
return True
def check_pinecone_client():
if not idx:
st.error("Pinecone client or index not available. Cannot perform vector operations.")
return False
return True
def session_exists(sid: int) -> bool:
"""Check if a session ID exists in the Supabase 'sessions' table."""
if not check_supabase_client(): return False
try:
resp = (supabase.table("sessions")
.select("id")
.eq("id", sid)
.limit(1)
.execute())
return bool(resp.data)
except SupabaseAPIError as e:
st.error(f"DB Error checking session {sid}: {e}")
return False
except Exception as e:
st.error(f"Unexpected error checking session {sid}: {e}")
return False
def create_new_session(sid: int) -> bool:
"""Attempt to create a new session ID in Supabase."""
if not check_supabase_client(): return False
try:
resp = supabase.table("sessions").insert({"id": sid}).execute()
# Check if insert succeeded (basic check, might need more robust error handling)
if resp.data and len(resp.data) > 0:
st.success(f"Session {sid} created in database.")
return True
else:
# Attempting to insert duplicate might return empty data without raising error sometimes
# Or there could be another issue. Check if it exists now just in case.
if session_exists(sid):
st.warning(f"Session {sid} already existed (possible race condition or previous error).")
return True # Treat as success if it exists now
else:
st.error(f"Failed to create session {sid}. Response: {resp}")
return False
except SupabaseAPIError as e:
# Handle specific error for duplicate key violation
if "23505" in str(e) or "duplicate key value violates unique constraint" in str(e): # PostgreSQL unique violation code
st.error(f"Session ID {sid} already exists in the database.")
else:
st.error(f"Database error creating session {sid}: {e}")
return False
except Exception as e:
st.error(f"Unexpected error creating session {sid}: {e}")
return False
def upload_blob(bucket: str, path: str, blob: bytes, mime: str) -> bool: # Expect bytes here
if not check_supabase_client():
return False
try:
# Ensure blob is actually bytes, as expected from the caller
if not isinstance(blob, bytes):
# This should ideally not happen based on how save_raw_files_to_supabase calls it
st.error(f"Internal Error: upload_blob expected bytes, received {type(blob)}")
return False
options = {
"contentType": mime,
"cacheControl": "3600", # 1 hour cache
"upsert": "true", # Overwrite if exists
}
# --- CORRECTED LINE: Pass the raw bytes object 'blob' directly ---
res = (
supabase
.storage
.from_(bucket)
.upload(path, blob, file_options=options) # Pass 'blob' (bytes) directly
)
# -----------------------------------------------------------------
# Optional: Add more robust success checking based on 'res' if needed
# print(f"DEBUG Supabase Upload Response ({bucket}/{path}): {res}") # For debugging
# Let's assume if no exception was raised, it succeeded.
# More robust checking might involve inspecting the HTTP status code if accessible via 'res'.
# For now, assume success if no exception.
return True
except SupabaseAPIError as e:
st.error(f"Supabase Storage Error ({bucket}/{path}): {e}")
return False
# Catch the TypeError from the library if it still occurs for some reason
except TypeError as e:
st.error(f"Upload TypeError ({bucket}/{path}): {e}")
return False
except Exception as e:
st.error(f"Unexpected upload error ({bucket}/{path}): {e}")
return False
except SupabaseAPIError as e:
st.error(f"Supabase Storage Error ({bucket}/{path}): {e}")
return False
except Exception as e:
st.error(f"Unexpected upload error ({bucket}/{path}): {e}")
return False
def download_blob(bucket: str, path: str) -> Optional[bytes]:
"""Download bytes from Supabase Storage."""
if not check_supabase_client(): return None
try:
resp = supabase.storage.from_(bucket).download(path)
return resp # Returns bytes on success, None or raises error on failure
except SupabaseAPIError as e:
# Handle "Not Found" specifically - adjusted error message check
if "NotFound" in str(e) or "specified key does not exist" in str(e).lower():
st.warning(f"File not found in Supabase Storage: {bucket}/{path}")
return None
else:
st.error(f"Supabase Storage Error downloading {bucket}/{path}: {e}")
return None
except Exception as e:
# Catch potential non-API errors during download
st.error(f"Unexpected error downloading {bucket}/{path}: {e}")
return None
def save_raw_files_to_supabase(session_id: int, csv_b: bytes, audio_b: bytes, audio_mime: str, zip_b: bytes) -> bool:
"""Uploads all three raw files to Supabase storage."""
if not check_supabase_client(): return False
st.info(f"Attempting to save raw files for session {session_id} to Supabase Storage...")
success = True
success &= upload_blob("gaze-csv", f"{session_id}/gaze.csv", csv_b, "text/csv")
success &= upload_blob("audio-files", f"{session_id}/audio.wav", audio_b, audio_mime) # Use determined mime type
success &= upload_blob("gif-zips", f"{session_id}/screens.zip", zip_b, "application/zip")
if success:
st.toast(f"Raw files saved to Supabase for session {session_id} βœ…", icon="☁️")
else:
st.error(f"One or more raw files failed to upload for session {session_id}.")
return success
def load_raw_files_from_supabase(session_id: int) -> Tuple[Optional[bytes], Optional[bytes], Optional[bytes], Optional[str]]:
"""Downloads all three raw files from Supabase storage. Returns (csv, audio, zip, audio_mime)."""
if not check_supabase_client(): return None, None, None, None
st.info(f"Attempting to load raw files for session {session_id} from Supabase Storage...")
# Assume standard names used during upload
csv_path = f"{session_id}/gaze.csv"
audio_path = f"{session_id}/audio.wav" # Keep the default assumption for the path
zip_path = f"{session_id}/screens.zip"
csv_b = download_blob("gaze-csv", csv_path)
audio_b = download_blob("audio-files", audio_path)
zip_b = download_blob("gif-zips", zip_path)
# Determine audio mime type (simple guess for now)
# In a real scenario, you might store the original file name or type in Supabase metadata
# For now, just return 'audio/wav' if audio_b is loaded.
audio_mime = "audio/wav" if audio_b else None
# Check if all files were loaded
all_loaded = csv_b is not None and audio_b is not None and zip_b is not None
if not all_loaded:
missing = []
if csv_b is None: missing.append("Gaze CSV")
if audio_b is None: missing.append("Audio")
if zip_b is None: missing.append("Screenshots ZIP")
st.warning(f"Could not load all raw files for session {session_id} from Supabase. Missing: {', '.join(missing)}.")
return csv_b, audio_b, zip_b, audio_mime
def save_segment_to_supabase(session_id: int, uid: int, seg_type: str, start_t: float, end_t: float, transcript: Optional[str]):
if not check_supabase_client(): return False
data_to_upsert = {
"session_id": session_id,
"seg_uid": uid,
"seg_type": seg_type,
"start_t": start_t,
"end_t": end_t,
"transcript": transcript if transcript is not None else "" # Ensure empty string instead of None if DB disallows NULL
}
# print(f"DEBUG: Attempting to save segment to Supabase:", data_to_upsert)
try:
# Use upsert with on_conflict to handle potential re-runs cleanly
response = supabase.table("segments").upsert(
data_to_upsert,
on_conflict='session_id, seg_uid' # Assumes PK is (session_id, seg_uid)
).execute()
# print(f"DEBUG: Supabase response for UID {uid}: Status={response.status_code}, Data={response.data}")
# Check if response indicates success (status code 200 or 201 typically)
# Checking len(response.data) > 0 is a reasonable proxy for success in upsert
if response.data and len(response.data) > 0:
# print(f"DEBUG: Save successful for UID {uid}")
return True
else:
# Log unexpected success status or empty data - maybe it already existed and wasn't updated?
warning_msg = f"Supabase upsert for segment {uid} (Session {session_id}) returned status {response.status_code} with data: {response.data}. It might have already existed."
# st.info(warning_msg) # Less severe than error
print(warning_msg)
# Let's assume it's okay if it existed - return True
return True
except SupabaseAPIError as e:
error_msg = ""
if 'foreign key constraint' in str(e).lower() or ('code' in dir(e) and e.code == '23503'):
error_msg = f"DB Error saving segment {uid}: Session ID {session_id} not found in 'sessions' table (Foreign Key Violation)."
elif 'invalid input syntax' in str(e).lower() or ('code' in dir(e) and e.code == '22P02'):
error_msg = f"DB Error saving segment {uid}: Data type mismatch. Check data sent vs table schema. Details: {e}"
else:
error_msg = f"DB API Error saving segment {uid} (Session {session_id}): {e}"
st.error(error_msg) # Make visible
print(error_msg)
return False
except Exception as e:
error_msg = f"Unexpected Error saving segment {uid} (Session {session_id}): {e}"
st.error(error_msg) # Make visible
print(error_msg)
return False
def save_embedding_to_pinecone(session_id: int, uid: int, seg_type: str, start_t: float, end_t: float, emb: np.ndarray):
"""Write/overwrite the vector for this segment in Pinecone."""
if not check_pinecone_client() or emb is None: return False
vector_id = f"{session_id}:{uid}" # Unique ID across all sessions
metadata = {
"session_id": session_id,
"seg_type": seg_type,
"start_t": round(start_t, 2),
"end_t": round(end_t, 2)
# Add any other metadata needed for filtering
}
try:
idx.upsert(vectors=[{
"id": vector_id,
"values": emb.tolist(), # Convert numpy array to list
"metadata": metadata
}])
# st.toast(f"Embedding {uid} saved to Pinecone.", icon="🧠") # Optional: Can be noisy
return True
except Exception as e:
st.error(f"Pinecone Error saving embedding {vector_id}: {e}")
return False
def hydrate_state_from_supabase(session_id: int):
if not check_supabase_client(): return False
st.info(f"Loading segment data for session {session_id} from Supabase...")
try:
rows = (supabase.table("segments").select("*").eq("session_id", session_id).order("start_t").execute().data)
# Reset relevant state before hydrating
st.session_state.gaze_segments = {'peak': [], 'valley': []}
st.session_state.transcripts = {}
st.session_state.embedded_ids = set()
st.session_state.chat_history = []
if not rows:
st.warning(f"No segment data found in Supabase for session {session_id}.")
return True # Return True indicating hydration attempt finished
# Fetch Pinecone IDs for this session to hydrate embedded_ids
# Note: This can be slow for many vectors. Consider alternative strategies if needed.
# A simple approach is to query Pinecone for IDs with the session_id metadata.
# Caution: Pinecone querying without a vector can be inefficient depending on setup.
# Alternative: Store embedding status in Supabase 'segments' table.
if check_pinecone_client():
try:
# Efficiently fetch *only* IDs for the session
# Note: list() might be paginated, handle large results if necessary
list_response = idx.list(prefix=f"{session_id}:")
if list_response and list_response.ids:
st.session_state.embedded_ids = {int(vec_id.split(':')[1]) for vec_id in list_response.ids if vec_id.startswith(f"{session_id}:")}
st.toast(f"Checked Pinecone: Found {len(st.session_state.embedded_ids)} existing embeddings for session {session_id}.", icon="🧠")
except Exception as pinecone_e:
st.warning(f"Could not fetch existing Pinecone embeddings for session {session_id}: {pinecone_e}")
peak_idx_counter = 0
valley_idx_counter = 0
for r in rows:
seg_type = r.get("seg_type") # Use .get for safety
start_t = r.get("start_t")
end_t = r.get("end_t")
db_uid = r.get("seg_uid") # UID from DB
if not all([seg_type, start_t is not None, end_t is not None, db_uid is not None]):
st.warning(f"Skipping incomplete segment row from DB: {r}")
continue
if seg_type not in ['peak', 'valley']:
st.warning(f"Unknown segment type '{seg_type}' found in DB for seg_uid {db_uid}")
continue
# Use the UID from the DB as the primary key
uid = db_uid
# Optional: Recalculate for verification?
# calc_uid = seg_uid(seg_type, start_t, end_t)
# if uid != calc_uid:
# st.warning(f"UID mismatch! DB: {uid}, Calc: {calc_uid} for {start_t}-{end_t}. Using DB UID.")
# Determine index based on order loaded
current_index = 0
if seg_type == 'peak':
current_index = peak_idx_counter
peak_idx_counter += 1
else: # valley
current_index = valley_idx_counter
valley_idx_counter += 1
seg_data = {
"index": current_index, # Assign based on load order
"start_time": start_t,
"end_time": end_t,
"seg_uid": uid,
"seg_type": seg_type
}
st.session_state.gaze_segments[seg_type].append(seg_data)
st.session_state.transcripts[uid] = r.get("transcript") # Store transcript from DB
# Hydrate embedded status (already fetched above)
# if uid in pinecone_session_ids: # Check against fetched IDs
# st.session_state.embedded_ids.add(uid)
st.toast(f"Loaded {len(rows)} segment records from Supabase.", icon="πŸ“₯")
return True
except SupabaseAPIError as e:
st.error(f"DB Error loading segments for session {session_id}: {e}")
return False
except Exception as e:
st.error(f"Unexpected error loading segments for session {session_id}: {e}")
return False
# --- Anti‑duplicate helper ---
def seg_uid(seg_type: str, start_t: float, end_t: float) -> int:
"""Deterministic integer ID for a segment."""
# Using hash is simple but can have collisions. Ensure consistent float representation.
start_t_str = f"{start_t:.6f}" # Use sufficient precision
end_t_str = f"{end_t:.6f}"
combined = f"{seg_type}_{start_t_str}_{end_t_str}"
# Use a large modulo to reduce collision probability further, ensure positive
return abs(hash(combined)) % (2**31 - 1) # Max value for standard signed int
# --- Embedding & Pinecone RAG Helpers ---
@st.cache_data # Cache embedding calls based on text content
def get_embedding(_openai_client: Optional[OpenAI], text: str) -> Optional[np.ndarray]:
"""Generates embedding for text using OpenAI."""
if not text or not _openai_client: return None
# Basic check for empty/whitespace text that might waste API call
if not text.strip(): return None
try:
resp = _openai_client.embeddings.create(input=[text], model=EMBED_MODEL) # Pass text as a list
if resp.data and resp.data[0].embedding:
return np.array(resp.data[0].embedding, dtype=np.float32)
else:
st.warning(f"OpenAI returned empty embedding data for text: '{text[:50]}...'")
return None
except AuthenticationError:
st.warning("OpenAI Auth Error during embedding. Check API Key.")
return None
except APIError as e:
st.warning(f"OpenAI API Error during embedding: {e}")
return None
except Exception as e:
st.warning(f"Failed to get embedding: {e}")
return None
def retrieve_relevant_transcripts(query_text: str, k: int = 4) -> List[str]:
"""Retrieves relevant transcript segments from Pinecone and formats them."""
if not check_pinecone_client() or not st.session_state.openai_client or not st.session_state.session_id:
return ["_Vector search unavailable._"]
try:
query_vec = get_embedding(st.session_state.openai_client, query_text)
if query_vec is None:
return ["_Could not generate embedding for query._"]
# Query Pinecone, filtering by the current session_id
res = idx.query(
vector=query_vec.tolist(),
top_k=k,
filter={"session_id": st.session_state.session_id},
include_metadata=True # Fetch metadata like time range
)
if not res or not res.matches: # Check if matches exist
return ["_No relevant transcript segments found in this session._"]
# Fetch full transcripts from Supabase using the IDs from Pinecone results
retrieved_texts = []
# Create a set of unique seg_uids to fetch from Supabase efficiently
seg_uids_to_fetch = set()
metadata_map = {} # Store metadata temporarily mapped by seg_uid
for match in res.matches:
vector_id = match.id
try:
retrieved_session_id, retrieved_seg_uid = map(int, vector_id.split(':'))
if retrieved_session_id == st.session_state.session_id:
seg_uids_to_fetch.add(retrieved_seg_uid)
metadata_map[retrieved_seg_uid] = match.metadata # Store metadata (start_t, end_t)
else:
st.warning(f"Retrieved vector {vector_id} from wrong session {retrieved_session_id} (expected {st.session_state.session_id}).")
except (ValueError, IndexError):
st.warning(f"Could not parse vector ID: {vector_id}")
if not seg_uids_to_fetch:
return ["_No matching segments found for this session after filtering._"]
# Fetch transcripts from Supabase in one batch if possible
fetched_transcripts = {}
if check_supabase_client():
try:
fetch_resp = (supabase.table("segments")
.select("seg_uid, transcript")
.eq("session_id", st.session_state.session_id)
.in_("seg_uid", list(seg_uids_to_fetch)) # Use 'in_' filter
.execute())
if fetch_resp.data:
for item in fetch_resp.data:
fetched_transcripts[item['seg_uid']] = item.get('transcript')
# Update session state cache as well
st.session_state.transcripts[item['seg_uid']] = item.get('transcript')
except SupabaseAPIError as db_e:
st.warning(f"DB error fetching batch transcripts: {db_e}")
except Exception as fetch_e:
st.warning(f"Unexpected error fetching batch transcripts: {fetch_e}")
# Format results using fetched transcripts and metadata
for seg_uid in seg_uids_to_fetch: # Iterate through the UIDs we intended to fetch
transcript = fetched_transcripts.get(seg_uid) # Get from batch result
metadata = metadata_map.get(seg_uid)
if metadata:
start_t = metadata.get('start_t', '?')
end_t = metadata.get('end_t', '?')
time_range_str = f"[{start_t}–{end_t}s]"
else:
time_range_str = "[?:?s]" # Fallback if metadata somehow missing
if transcript is None:
# If Supabase fetch failed or transcript was null/missing
transcript_text = f"{time_range_str} _[Transcript unavailable or not found]_"
elif not transcript.strip():
transcript_text = f"{time_range_str} _[Segment transcribed as silent]_"
else:
transcript_text = f"{time_range_str} {transcript}"
retrieved_texts.append(transcript_text)
# Sort results based on relevance score from Pinecone (optional, requires storing score)
# For simplicity, returning in the order UIDs were processed.
return retrieved_texts
except Exception as e:
st.error(f"Unexpected error during retrieval: {e}")
return ["_Error during vector search._"]
# --- Sidebar for Session Control & Config ---
with st.sidebar:
st.header("Session Control")
# Disable widgets if session is locked
is_locked = st.session_state.get("session_locked", False)
mode = st.radio(
"Mode",
["Load existing", "Create new"],
index=0 if st.session_state.session_mode == "Load existing" else 1,
horizontal=True,
key="mode_widget",
disabled=is_locked,
help="Choose 'Load existing' to load data previously saved under an ID. Choose 'Create new' to start a fresh session with a new ID."
)
# Update session_state immediately if mode changes (before locking)
if not is_locked:
st.session_state.session_mode = mode
session_id_input = st.number_input(
"Session ID",
step=1,
min_value=1,
format="%d",
key="id_widget",
value=st.session_state.session_id if st.session_state.session_id else 1, # Default to 1 or current ID if locked
disabled=is_locked,
help="Enter a unique positive integer ID for the session."
)
confirm_button = st.button("Confirm Session", disabled=is_locked or not supabase) # Disable if Supabase isn't working
unlock_button = st.button("Change Session", disabled=not is_locked)
if is_locked:
st.success(f"Active Session: {st.session_state.session_id} ({st.session_state.session_mode})")
st.divider()
st.header("Configuration")
# OpenAI API Key Input
openai_api_key_input = st.text_input(
"OpenAI API Key",
type="password",
key="openai_api_key_input",
help="Required for transcription and chat.",
placeholder="sk-...",
value=st.secrets.get("OPENAI_API_KEY", "") # Pre-fill from secrets if available
)
# Initialize OpenAI client when key is provided or changes
current_key = openai_api_key_input or st.secrets.get("OPENAI_API_KEY")
# Check if key exists AND (client is None OR key has changed)
if current_key and (st.session_state.openai_client is None or st.session_state.openai_client.api_key != current_key):
try:
# Clear previous client first if exists
if st.session_state.openai_client: st.session_state.openai_client = None
st.session_state.openai_client = OpenAI(api_key=current_key)
st.session_state.openai_client.models.list() # Test connection/key validity
st.success("OpenAI client OK.", icon="πŸš€")
except AuthenticationError:
st.error("Auth Error: Invalid OpenAI Key.", icon="❌")
st.session_state.openai_client = None
except APIError as e:
st.error(f"OpenAI API Error: {e}.", icon="❌")
st.session_state.openai_client = None
except Exception as e:
st.error(f"Error initializing OpenAI: {e}", icon="❌")
st.session_state.openai_client = None
elif not current_key and st.session_state.openai_client is not None:
st.session_state.openai_client = None # Clear client if key is removed
st.info("OpenAI key removed. Client deactivated.")
# Display Status
st.divider()
st.markdown("#### System Status")
st.caption(f"Supabase Client: {'βœ… Ready' if supabase else '❌ Not Ready'}")
st.caption(f"Pinecone Client: {'βœ… Ready' if idx else '❌ Not Ready'}")
st.caption(f"OpenAI Client: {'βœ… Ready' if st.session_state.openai_client else '❌ Not Ready'}")
st.caption(f"Pydub Library: {'βœ… Found' if PYDUB_AVAILABLE else '❌ Not Found'}")
# --- Session Confirmation/Unlock Logic ---
if confirm_button and not is_locked:
confirmed_id = int(session_id_input) # Cast to int
if st.session_state.session_mode == "Create new":
# 1. Check if it already exists (to prevent accidental overwrite logic)
if session_exists(confirmed_id):
st.warning(f"Session {confirmed_id} already exists. Switch to 'Load existing' mode or choose a different ID.")
# Don't stop, let user correct and press button again
else:
# 2. Attempt to create the new session row in Supabase
if create_new_session(confirmed_id):
# 3. Success: Lock the session state
st.session_state.session_id = confirmed_id
st.session_state.session_locked = True
st.session_state.data_loaded = False # Mark data as not loaded yet
st.session_state.raw_saved = False # Mark raw files as not saved yet
# Clear any data from previous sessions
init_session_state() # Re-initialize to clear old data but keep new session info
st.session_state.session_id = confirmed_id
st.session_state.session_locked = True
st.session_state.session_mode = "Create new"
st.session_state.initialized = True # Ensure it's marked initialized
st.rerun()
else:
# Error is shown by create_new_session
pass # Don't stop, let user retry
elif st.session_state.session_mode == "Load existing":
# 1. Check if the session exists in Supabase
if session_exists(confirmed_id):
# 2. Success: Lock the session state
st.session_state.session_id = confirmed_id
st.session_state.session_locked = True
st.session_state.data_loaded = False # Mark data as needing loading
st.session_state.raw_saved = True # Assume raw files were saved when session was created
# Clear any data from previous sessions before loading
init_session_state() # Re-initialize
st.session_state.session_id = confirmed_id
st.session_state.session_locked = True
st.session_state.session_mode = "Load existing"
st.session_state.initialized = True
st.rerun()
else:
st.error(f"Session {confirmed_id} does not exist in the database. Cannot load.")
# Don't stop, let user correct ID
if unlock_button and is_locked:
# Reset session state variables and allow re-confirmation
# Preserve sidebar state like API keys if desired
openai_client_temp = st.session_state.get('openai_client')
init_session_state() # Reset everything else
st.session_state.session_id = None
st.session_state.session_locked = False
st.session_state.session_mode = "Create new" # Default back to create mode
# Restore potentially preserved state
if openai_client_temp: st.session_state.openai_client = openai_client_temp
st.info("Session unlocked. Choose mode and ID, then Confirm.")
st.rerun()
# --- Main App Area ---
# Display message if session is not locked
if not st.session_state.get("session_locked"):
st.info("⬅️ Please configure and confirm a Session ID in the sidebar to begin.")
st.stop() # Don't proceed further until session is locked
# --- Data Loading / Hydration (runs ONCE after session lock) ---
if st.session_state.session_locked and not st.session_state.data_loaded:
session_id_to_use = st.session_state.session_id
st.info(f"Session {session_id_to_use} confirmed ({st.session_state.session_mode}). Loading data...")
if st.session_state.session_mode == "Load existing":
# 1. Load Raw Files from Supabase
csv_bytes, audio_bytes, zip_bytes, audio_mime = load_raw_files_from_supabase(session_id_to_use)
processed_something = False # Flag to check if any data was actually processed
# 2. Process loaded raw files into usable data structures
gaze_loaded = False
if csv_bytes:
try:
# Reset relevant state before processing
reset_all_processing_state() # Clear segments, transcripts etc.
st.session_state.gaze_df = pd.read_csv(BytesIO(csv_bytes))
if st.session_state.gaze_df.empty: raise ValueError("Gaze CSV is empty.")
required_cols = ['log_time_float', 'gaze_change_smoothed', 'is_peak', 'is_valley']
if not all(col in st.session_state.gaze_df.columns for col in required_cols):
raise ValueError(f"Gaze CSV missing required columns: {required_cols}")
st.toast("Gaze data loaded from Supabase.", icon="πŸ“„")
gaze_loaded = True
processed_something = True
# Find segments immediately if gaze data is loaded
p_segs, v_segs = get_gaze_segments(st.session_state.gaze_df)
st.session_state.gaze_segments = {'peak': p_segs, 'valley': v_segs}
st.toast(f"Found {len(p_segs)} peak / {len(v_segs)} valley segments from gaze data.")
except Exception as e:
st.error(f"Error processing gaze CSV loaded from Supabase: {e}")
st.session_state.gaze_df = None
else:
st.warning("Gaze CSV file not found or failed to load from Supabase.")
audio_loaded = False
if audio_bytes and PYDUB_AVAILABLE:
try:
# Reset audio-dependent state
reset_audio_dependent_state()
st.session_state.full_audio = None
st.session_state.audio_load_error = None
# Use filename hint if available, otherwise default format
fmt = audio_mime.split('/')[-1] if audio_mime else "wav" # Guess format from mime type
st.session_state.full_audio = AudioSegment.from_file(BytesIO(audio_bytes), format=fmt)
st.toast(f"Audio data loaded from Supabase (format: {fmt}).", icon="🎧")
audio_loaded = True
processed_something = True
except CouldntDecodeError:
err_msg = f"Could not decode audio file loaded from Supabase (assumed format: {fmt}). Check file/format."
st.error(err_msg)
st.session_state.full_audio = None
st.session_state.audio_load_error = err_msg
except Exception as e:
err_msg = f"Error loading audio from Supabase: {e}"
st.error(err_msg)
st.session_state.full_audio = None
st.session_state.audio_load_error = err_msg
elif not audio_bytes:
st.warning("Audio file not found or failed to load from Supabase.")
elif not PYDUB_AVAILABLE:
st.warning("Pydub not available, cannot process loaded audio bytes.")
gifs_loaded = False
if zip_bytes:
# Call the cached processing function
zip_processing_results = process_screenshot_zip_cached(zip_bytes)
# Handle results OUTSIDE the cached function
if zip_processing_results["success"]:
st.session_state.gif_lookup = zip_processing_results["gif_lookup"]
processed_count = zip_processing_results["processed_count"]
skipped_count = zip_processing_results["skipped_count"]
if processed_count > 0 or skipped_count > 0 or zip_processing_results["warnings"]: # Only toast/warn if something happened
gifs_loaded = True
processed_something = True
st.toast(f"GIFs loaded/processed from Supabase ZIP ({processed_count} found, {skipped_count} skipped).", icon="πŸ–ΌοΈ")
for warning_msg in zip_processing_results.get("warnings", []):
st.warning(warning_msg) # Show warnings from processing
# for error_msg in zip_processing_results.get("error_messages", []):
# st.caption(f"GIF processing note: {error_msg}") # Optional detail
else:
st.error(f"Failed to process screenshots ZIP from Supabase: {zip_processing_results['fatal_error']}")
st.session_state.gif_lookup = None
else:
st.warning("Screenshots ZIP file not found or failed to load from Supabase.")
# 3. Load Segment Metadata (Transcripts/Embeddings) from Supabase table
# This will override segments found from gaze_df if data exists in DB
# It also hydrates transcript/embedding status
segment_data_loaded = hydrate_state_from_supabase(session_id_to_use)
if segment_data_loaded and any(st.session_state.gaze_segments.values()):
processed_something = True # Mark as processed if DB hydration worked
st.session_state.data_loaded = True # Mark loading attempt complete
if processed_something:
st.success(f"Data loading process finished for session {session_id_to_use}.")
st.rerun() # Rerun to update UI with loaded data
else:
st.warning(f"No data could be loaded or processed for session {session_id_to_use}.")
# No rerun needed if nothing changed
elif st.session_state.session_mode == "Create new":
# For new sessions, data comes from uploads, not Supabase initially.
# Mark as "loaded" so this block doesn't run again.
# Actual processing happens based on file uploads below.
st.session_state.data_loaded = True
st.info("New session created. Upload files below to populate data.")
# No rerun needed here, upload section will handle it
# --- File Upload Section (Appears only if session is locked) ---
with st.expander("Upload Files for Current Session", expanded=not st.session_state.data_loaded): # Expand if data hasn't been loaded/processed yet
st.write(f"Upload files for Session ID: **{st.session_state.session_id}**")
st.write("""
Upload:
1. Processed gaze data (CSV).
2. The corresponding **full** audio file (WAV, MP3, etc.).
3. A **ZIP file** containing the `screenshots` folder (with `peaks`/`valleys` subfolders and `animation.gif` files).
""")
# Only allow uploads if in "Create new" mode OR if in "Load existing" but data_loaded is False (initial load failed?)
# disable_uploads = st.session_state.session_mode == "Load existing" and st.session_state.data_loaded
disable_uploads = False # Simpler: always allow re-upload? Or refine logic as needed.
col1_upload, col2_upload, col3_upload = st.columns(3)
with col1_upload:
st.markdown("##### 1. Gaze CSV")
uploaded_gaze_file = st.file_uploader("Choose CSV", type="csv", key="gaze_uploader", label_visibility="collapsed", disabled=disable_uploads)
with col2_upload:
st.markdown("##### 2. Full Audio File")
uploaded_audio_file = st.file_uploader("Choose Audio", type=["wav", "mp3", "ogg", "flac", "m4a"], key="full_audio_uploader", label_visibility="collapsed", disabled=disable_uploads)
with col3_upload:
st.markdown("##### 3. Screenshots ZIP")
uploaded_zip_file = st.file_uploader("Choose ZIP", type="zip", key="zip_uploader", label_visibility="collapsed", disabled=disable_uploads)
# --- Raw File Saving Logic (for NEW sessions) ---
# This triggers *only* when creating a new session, all 3 files are uploaded *for the first time*,
# and they haven't been saved yet.
if (st.session_state.session_mode == "Create new" and
not st.session_state.raw_saved and
uploaded_gaze_file is not None and
uploaded_audio_file is not None and
uploaded_zip_file is not None):
# Read file bytes immediately after upload confirmation
gaze_bytes = uploaded_gaze_file.getvalue()
audio_bytes = uploaded_audio_file.getvalue()
zip_bytes = uploaded_zip_file.getvalue()
# Determine audio mime type from uploaded file name
audio_mime = uploaded_audio_file.type or "audio/wav" # Use detected type or default
if save_raw_files_to_supabase(
st.session_state.session_id,
gaze_bytes,
audio_bytes, # 2nd positional arg = audio file *bytes*
audio_mime, # 3rd positional arg = mime-type (str)
zip_bytes
):
st.session_state.raw_saved = True # Mark as saved to prevent re-saving
# No rerun needed here, processing logic below will handle the data
else:
st.error("Failed to save raw files to Supabase Storage. Processing will use local copies only for this run.")
# Keep raw_saved as False? Or set True anyway to avoid retries? Depends on desired behavior.
# Let's set it to True to avoid repeated failed attempts on reruns.
st.session_state.raw_saved = True
# --- Main Layout Definition ---
plot_expander_placeholder = st.container()
col_chat, col_segments = st.columns([3, 2], gap="large") # Chatbot on left (wider), Segments on right
# --- Plotting Function ---
def create_plotly_gaze_plot(df, title_suffix=""):
# (Your existing create_plotly_gaze_plot function - unchanged)
required_cols = ['log_time_float', 'gaze_change_smoothed', 'is_peak', 'is_valley']
if df is None or df.empty: return None
if not all(col in df.columns for col in required_cols):
st.warning("Plotting requires columns: " + ", ".join(required_cols))
return None
try:
# Ensure boolean conversion handles strings 'True'/'False' and actual booleans
# Make copies to avoid SettingWithCopyWarning if df is a slice
df = df.copy()
df['is_peak'] = df['is_peak'].apply(lambda x: str(x).strip().lower() == 'true' if pd.notna(x) else False)
df['is_valley'] = df['is_valley'].apply(lambda x: str(x).strip().lower() == 'true' if pd.notna(x) else False)
except Exception as e: st.error(f"Error processing peak/valley columns: {e}"); return None
df = df.sort_values('log_time_float').reset_index(drop=True)
peaks_df = df[df['is_peak']].copy()
valleys_df = df[df['is_valley']].copy()
fig = go.Figure()
# Main gaze line
fig.add_trace(go.Scatter(x=df['log_time_float'], y=df['gaze_change_smoothed'], mode='lines', name='Smoothed Gaze Change', line=dict(color='darkgrey', width=1.5), hoverinfo='x+y'))
# Peak markers
fig.add_trace(go.Scatter(x=peaks_df['log_time_float'], y=peaks_df['gaze_change_smoothed'], mode='markers', name=f'Peaks ({len(peaks_df)})', marker=dict(color='mediumblue', size=15, symbol='triangle-up', line=dict(width=1.5, color='black')), hoverinfo='x+y', customdata=peaks_df[['log_time_float']].values, # Add time for click events if needed later
))
# Valley markers
fig.add_trace(go.Scatter(x=valleys_df['log_time_float'], y=valleys_df['gaze_change_smoothed'], mode='markers', name=f'Valleys ({len(valleys_df)})', marker=dict(color='darkgreen', size=15, symbol='triangle-down', line=dict(width=1.5, color='black')), hoverinfo='x+y', customdata=valleys_df[['log_time_float']].values, # Add time for click events if needed later
))
# Optional segment lines (subtle)
if len(peaks_df) > 1: fig.add_trace(go.Scatter(x=peaks_df['log_time_float'], y=peaks_df['gaze_change_smoothed'], mode='lines', name='Peak Segments', line=dict(color='lightblue', width=1, dash='dot'), showlegend=False, hoverinfo='skip'))
if len(valleys_df) > 1: fig.add_trace(go.Scatter(x=valleys_df['log_time_float'], y=valleys_df['gaze_change_smoothed'], mode='lines', name='Valley Segments', line=dict(color='lightgreen', width=1, dash='dot'), showlegend=False, hoverinfo='skip'))
title = f"Smoothed Gaze Change (Session {st.session_state.session_id}){title_suffix}"
fig.update_layout(title=title, xaxis_title="Time (seconds)", yaxis_title="Gaze Change (Smoothed)", hovermode="x unified", legend_title_text='Events', template="plotly_white", height=500)
# Dynamic Axis Range
if not df['log_time_float'].empty: fig.update_xaxes(range=[df['log_time_float'].min(), df['log_time_float'].max()])
gaze_col_numeric = pd.to_numeric(df['gaze_change_smoothed'], errors='coerce').dropna() # Ensure numeric and drop NaN
if not gaze_col_numeric.empty:
min_y, max_y = gaze_col_numeric.min(), gaze_col_numeric.max()
if pd.notna(min_y) and pd.notna(max_y):
padding = (max_y - min_y) * 0.10 if max_y > min_y else 0.1
fig.update_yaxes(range=[min_y - padding if padding > 0 else min_y - 0.1, max_y + padding if padding > 0 else max_y + 0.1])
return fig
# --- Audio Loading and Slicing ---
@st.cache_data # Cache based on file content hash + filename (for format hint)
def load_full_audio(uploaded_file_bytes: bytes, filename: str) -> Tuple[Optional[AudioSegment], Optional[str]]:
"""Loads audio from bytes, requires filename for format hint."""
if not PYDUB_AVAILABLE: return None, "Pydub library not available."
if not uploaded_file_bytes: return None, "No file bytes provided."
try:
# Use filename extension to guess format
fmt = os.path.splitext(filename)[1].lower().strip('.')
if not fmt: fmt = "wav" # Default guess if no extension
# Handle common alternatives
if fmt == "m4a": fmt = "mp4" # pydub often uses mp4 for m4a
# Add more mappings if needed
audio = AudioSegment.from_file(BytesIO(uploaded_file_bytes), format=fmt)
return audio, None # Return AudioSegment object and no error
except CouldntDecodeError: return None, f"Could not decode audio '{filename}' with format hint '{fmt}'. Is FFmpeg/correct lib installed & format supported by Pydub?"
except Exception as e: return None, f"Error loading audio '{filename}': {e}"
@st.cache_data # Cache audio segment extraction
def extract_audio_segment_bytes(_full_audio_segment: AudioSegment, start_sec: float, end_sec: float, format="wav") -> Tuple[Optional[bytes], Optional[str]]:
"""Extracts a slice of audio and returns its bytes."""
if _full_audio_segment is None: return None, "Full audio not loaded."
# Check for valid time range relative to audio length
audio_len_sec = len(_full_audio_segment) / 1000.0
if start_sec < 0 or end_sec < 0: return None, "Start/End times cannot be negative."
if start_sec >= end_sec: return None, "Start time must be less than end time."
if start_sec >= audio_len_sec: return None, f"Start time ({start_sec:.2f}s) is beyond audio length ({audio_len_sec:.2f}s)."
# Clamp end_sec to audio length if it exceeds it
end_sec = min(end_sec, audio_len_sec)
if start_sec >= end_sec: return None, f"Calculated slice is invalid after clamping end time ({start_sec:.2f}s >= {end_sec:.2f}s)." # Should not happen if checks above pass, but good safeguard
start_ms, end_ms = int(start_sec * 1000), int(end_sec * 1000)
# Double check ms slice validity
if start_ms >= end_ms: return None, f"Invalid millisecond slice: {start_ms}ms >= {end_ms}ms (Audio length: {len(_full_audio_segment)}ms)."
try:
audio_slice = _full_audio_segment[start_ms:end_ms]
if len(audio_slice) == 0:
return None, "Extracted audio slice has zero length." # Handle cases where slice results in nothing
buffer = BytesIO(); audio_slice.export(buffer, format=format); buffer.seek(0)
return buffer.getvalue(), None
except Exception as e: return None, f"Error extracting/exporting audio slice: {e}"
# --- Transcription Function ---
def transcribe_audio_segment(
_openai_client: Optional[OpenAI],
_full_audio_segment: Optional[AudioSegment],
segment_info: dict, # Contains start_time, end_time, seg_uid, index, type
export_format="wav", model="whisper-1"
) -> Tuple[Optional[str], Optional[str]]:
"""
Transcribes a specific audio segment. Checks cache first.
Returns (transcript_text, error_message). Updates cache on success/failure.
"""
uid = segment_info['seg_uid']
start_t, end_t = segment_info['start_time'], segment_info['end_time']
# print(f"DEBUG: transcribe_audio_segment called for UID {uid}") # Optional
# 1. Check session state cache first
if uid in st.session_state.transcripts:
cached_result = st.session_state.transcripts[uid]
# If it's not None (pending), return the cached result (str or "")
if cached_result is not None:
# print(f"DEBUG: Using cached transcript for UID {uid}")
return cached_result, None
# 2. Check prerequisites
if _openai_client is None:
# Don't cache failure here, let background process retry if client becomes available
# st.session_state.transcripts[uid] = ""
return None, "OpenAI client not initialized."
if _full_audio_segment is None:
# Don't cache failure here
# st.session_state.transcripts[uid] = ""
return None, "Full audio not loaded."
# 3. Extract audio segment bytes (uses its own cache)
audio_bytes, err_extract = extract_audio_segment_bytes(_full_audio_segment, start_t, end_t, format=export_format)
if err_extract or not audio_bytes:
log_msg = f"Audio extraction failed for UID {uid}: {err_extract or 'No bytes extracted'}"
print(log_msg)
st.session_state.transcripts[uid] = "" # Cache extraction failure as empty transcript
return None, log_msg
# else:
# print(f"DEBUG: Extracted {len(audio_bytes)} bytes for UID {uid}")
# Add a check for very short segments (might be silent/error)
# if len(audio_bytes) < 200: # Approx 0.01s of WAV - likely too short
# log_msg = f"Warning: Audio segment for UID {uid} is very short ({len(audio_bytes)} bytes). May result in empty transcript."
# print(log_msg)
# 4. Call OpenAI Transcription API
try:
audio_file_object = BytesIO(audio_bytes)
dummy_filename = f"segment_{uid}.{export_format}"
# print(f"DEBUG: Calling OpenAI transcription for UID {uid}...")
transcription_response = _openai_client.audio.transcriptions.create(
model=model,
file=(dummy_filename, audio_file_object, f"audio/{export_format}"), # Pass as tuple
response_format="text", # Get plain text directly
)
# print(f"DEBUG: OpenAI response for UID {uid}: '{transcription_response}'")
# Response is directly the text string
transcript_text = transcription_response.strip() if isinstance(transcription_response, str) else ""
st.session_state.transcripts[uid] = transcript_text # Cache result (even empty string)
return transcript_text, None # Success
except AuthenticationError as e:
err_msg = f"Authentication Error with OpenAI for UID {uid}: {e}"
print(err_msg)
st.session_state.transcripts[uid] = "" # Cache failure
return None, err_msg
except APIError as e:
# Handle specific API errors if needed (e.g., rate limits, invalid requests)
err_msg = f"OpenAI API Error for UID {uid}: {e}"
print(err_msg)
st.session_state.transcripts[uid] = "" # Cache failure
return None, err_msg
except Exception as e:
err_msg = f"Transcription error for UID {uid}: {e}"
print(err_msg)
st.session_state.transcripts[uid] = "" # Cache failure
return None, err_msg
# --- GIF Processing function is already defined above with @st.cache_data ---
# --- process_screenshot_zip_cached(...) ---
# --- File Processing Logic (Triggered by Uploads) ---
# Helper to check if a file was newly uploaded
def file_uploaded(uploader_key: str) -> bool:
return st.session_state.get(uploader_key) is not None
# Process Gaze CSV
gaze_file = st.session_state.get("gaze_uploader")
if gaze_file and gaze_file.file_id != st.session_state.processed_gaze_file_id:
st.info(f"Processing uploaded gaze file: {gaze_file.name}...")
try:
# Reset related state BEFORE processing new file
reset_all_processing_state() # Resets segments, transcripts, embeds, file IDs
st.session_state.gaze_df = pd.read_csv(gaze_file) # Read from uploaded file object
if st.session_state.gaze_df.empty: raise ValueError("Gaze CSV is empty.")
required = ['log_time_float', 'gaze_change_smoothed', 'is_peak', 'is_valley']
if not all(col in st.session_state.gaze_df.columns for col in required):
raise ValueError(f"Gaze CSV needs columns: {', '.join(required)}")
# Find segments immediately after loading
p_segs, v_segs = get_gaze_segments(st.session_state.gaze_df)
st.session_state.gaze_segments = {'peak': p_segs, 'valley': v_segs}
st.session_state.processed_gaze_file_id = gaze_file.file_id # Mark as processed
st.toast(f"Loaded gaze: {len(st.session_state.gaze_df)} rows. Found {len(p_segs)} peaks, {len(v_segs)} valleys.", icon="πŸ“„")
st.rerun() # Rerun to update plot and potentially trigger background tasks
except Exception as e:
st.error(f"Error processing gaze CSV '{gaze_file.name}': {e}")
st.session_state.gaze_df = None # Clear invalid data
st.session_state.processed_gaze_file_id = None # Allow re-upload
reset_all_processing_state() # Clear segments etc.
st.rerun() # Rerun to reflect cleared state
# Process Audio File
audio_file = st.session_state.get("full_audio_uploader")
if audio_file and audio_file.file_id != st.session_state.processed_audio_file_id:
st.info(f"Processing uploaded audio file: {audio_file.name}...")
if not PYDUB_AVAILABLE:
st.error("Cannot process audio: Pydub library not installed.")
st.session_state.processed_audio_file_id = audio_file.file_id # Mark as seen, even if not processed
else:
try:
# Reset relevant state before loading new audio
st.session_state.full_audio = None
st.session_state.audio_load_error = None
reset_audio_dependent_state() # Transcripts/embeddings/file ID depend on audio
audio_bytes = audio_file.getvalue()
audio_data, error_msg = load_full_audio(audio_bytes, audio_file.name) # Use cached function
if error_msg:
st.error(f"Audio load failed: {error_msg}")
st.session_state.audio_load_error = error_msg
else:
st.session_state.full_audio = audio_data
st.toast(f"Loaded audio: {audio_file.name}", icon="🎧")
st.session_state.processed_audio_file_id = audio_file.file_id # Mark as processed
st.rerun() # Rerun to potentially trigger background tasks
except Exception as e:
st.error(f"Unexpected error handling audio upload: {e}")
st.session_state.full_audio = None
st.session_state.audio_load_error = "Upload handling error."
st.session_state.processed_audio_file_id = None # Allow re-upload
reset_audio_dependent_state()
st.rerun()
# Process ZIP File
zip_file = st.session_state.get("zip_uploader")
if zip_file and zip_file.file_id != st.session_state.processed_zip_file_id:
st.info(f"Processing uploaded ZIP file: {zip_file.name}...")
try:
# Reset only GIF lookup and its processor ID
st.session_state.gif_lookup = None
st.session_state.processed_zip_file_id = None
zip_bytes = zip_file.getvalue()
# Call the cached function
processing_results = process_screenshot_zip_cached(zip_bytes)
# Handle results OUTSIDE the cached function
if processing_results["success"]:
st.session_state.gif_lookup = processing_results["gif_lookup"]
processed_count = processing_results["processed_count"]
skipped_count = processing_results["skipped_count"]
# Display toasts/messages based on results
if processed_count > 0 or skipped_count > 0 or processing_results.get("warnings"):
st.toast(f"Processed {processed_count} GIFs, Skipped {skipped_count} items in ZIP.", icon="πŸ–ΌοΈ")
for warning_msg in processing_results.get("warnings", []):
st.warning(warning_msg) # Show functional warnings
# Optionally display detailed error messages if needed
# for msg in processing_results.get("error_messages", []):
# st.caption(f"ZIP Processing Note: {msg}") # Less prominent notes
st.session_state.processed_zip_file_id = zip_file.file_id # Mark as processed
st.rerun() # Rerun to update segment display with GIFs
else:
# Handle fatal errors reported by the function
st.error(f"Failed to process ZIP: {processing_results['fatal_error']}")
st.session_state.gif_lookup = None
st.session_state.processed_zip_file_id = None # Allow re-upload
except Exception as e:
# This catches errors *outside* the processing function call itself (e.g., getvalue failed)
st.error(f"Unexpected error handling ZIP upload: {e}")
st.session_state.gif_lookup = None
st.session_state.processed_zip_file_id = None # Allow re-upload
# No rerun needed here, error is displayed
# --- Background Processing Trigger (Transcription & Embedding) ---
def trigger_background_processing():
# Prerequisite checks
if not st.session_state.openai_client: st.sidebar.warning("Processing skipped: OpenAI client not ready."); st.session_state._processing_triggered = False; return
if not st.session_state.full_audio: st.sidebar.warning("Processing skipped: Full audio not loaded."); st.session_state._processing_triggered = False; return
if not check_supabase_client(): st.sidebar.warning("Processing skipped: Supabase client not ready."); st.session_state._processing_triggered = False; return
can_embed = check_pinecone_client() # Check if embedding is possible, but don't stop if not
segments_to_process = [seg for seg_type in ['peak', 'valley'] for seg in st.session_state.gaze_segments.get(seg_type, []) if 'seg_uid' in seg]
if not segments_to_process: st.session_state._processing_triggered = False; return # No segments found
# Filter segments that actually need processing (pending transcript or embedding)
segments_needing_work = []
for segment in segments_to_process:
uid = segment['seg_uid']
needs_transcription = st.session_state.transcripts.get(uid) is None
# Needs embedding if transcript exists (not None, not "") AND not already embedded
needs_embedding = can_embed and st.session_state.transcripts.get(uid) and uid not in st.session_state.embedded_ids
if needs_transcription or needs_embedding:
segments_needing_work.append(segment)
if not segments_needing_work:
# st.sidebar.info("All segments processed.") # Optional message
st.session_state._processing_triggered = False
return
st.sidebar.markdown("--- \n**Processing Status**"); progress_bar = st.sidebar.progress(0); status_text = st.sidebar.empty(); total_segments_to_work = len(segments_needing_work)
newly_transcribed = 0; newly_embedded = 0; errors_encountered = 0
start_time = time.time()
st.toast(f"Starting background processing for {total_segments_to_work} segments...", icon="βš™οΈ")
for i, segment in enumerate(segments_needing_work):
uid = segment['seg_uid']; seg_type = segment['seg_type']; start_t = segment['start_time']; end_t = segment['end_time']
progress = (i + 1) / total_segments_to_work
status_text.info(f"[{i+1}/{total_segments_to_work}] Processing {seg_type} segment {uid} ({start_t:.2f}-{end_t:.2f}s)...")
# --- Step 1: Transcription (if needed) ---
transcript_text = st.session_state.transcripts.get(uid)
needs_transcription = transcript_text is None
transcription_succeeded_this_run = False
if needs_transcription:
# print(f"DEBUG: Calling transcribe for UID {uid}")
transcript_text, err_transcribe = transcribe_audio_segment(st.session_state.openai_client, st.session_state.full_audio, segment)
if err_transcribe:
print(f"Transcription FAILED for {uid}: {err_transcribe}") # Error logged inside function too
# State cache (st.session_state.transcripts[uid]) is set to "" inside transcribe_audio_segment on failure
errors_encountered += 1
elif transcript_text is not None: # Success (even if empty string)
# print(f"DEBUG: Transcription SUCCEEDED for UID {uid}. Text: '{transcript_text[:50]}...'")
transcription_succeeded_this_run = True
newly_transcribed += 1
# --- Step 2: Save transcript ---
# print(f"DEBUG: Calling save transcript for UID {uid}")
save_success = save_segment_to_supabase(st.session_state.session_id, uid, seg_type, start_t, end_t, transcript_text)
if not save_success:
# Error logged inside save_segment_to_supabase
status_text.warning(f"Save FAILED for transcript {uid}! Check DB logs.") # Use warning, maybe temporary issue
errors_encountered += 1
# If save fails, should we revert transcript state? Maybe not, keep the text locally.
# transcription_succeeded_this_run = False # Mark overall as failed if save fails?
# else:
# print(f"DEBUG: Save transcript SUCCEEDED for UID {uid}")
# else: Should not happen if err_transcribe is None
# --- Step 3: Embedding (if needed and possible) ---
# Re-fetch state OR use the flag/text from transcription step
# Embed only if transcript exists in state (meaning it was processed/saved or loaded)
# and is not an empty string, and not already embedded.
transcript_text_for_embed = st.session_state.transcripts.get(uid) # Get latest state again
needs_embedding = can_embed and transcript_text_for_embed and uid not in st.session_state.embedded_ids
if needs_embedding:
# print(f"DEBUG: Calling embedding for UID {uid}")
embedding_vector = get_embedding(st.session_state.openai_client, transcript_text_for_embed)
if embedding_vector is not None:
# --- Step 4: Save embedding ---
# print(f"DEBUG: Calling save embedding for UID {uid}")
if save_embedding_to_pinecone(st.session_state.session_id, uid, seg_type, start_t, end_t, embedding_vector):
st.session_state.embedded_ids.add(uid)
newly_embedded += 1
# print(f"DEBUG: Save embedding SUCCEEDED for UID {uid}")
else:
# Error logged inside save_embedding_to_pinecone
status_text.warning(f"Save embedding FAILED for {uid}! Check Pinecone logs.") # Use warning
errors_encountered += 1
else:
# Embedding failed (logged within get_embedding)
# Don't count as error here, could be valid empty text
print(f"Embedding skipped or failed for {uid} (likely empty text or OpenAI error).")
progress_bar.progress(progress)
# Add small delay to allow UI updates and prevent hammering APIs?
# time.sleep(0.05)
# --- Processing Finished ---
end_time = time.time()
duration = end_time - start_time
final_message = (f"Processing finished in {duration:.1f}s. "
f"New transcripts: {newly_transcribed}, New embeddings: {newly_embedded}. "
f"Errors: {errors_encountered}.")
if errors_encountered > 0:
status_text.warning(final_message)
st.toast(f"Processing complete with {errors_encountered} errors.", icon="⚠️")
else:
status_text.success(final_message)
st.toast("Processing complete.", icon="βœ…")
# Clear sidebar status after a delay
time.sleep(5)
status_text.empty()
progress_bar.empty()
st.session_state._processing_triggered = False # Allow triggering again
# Rerun only if something actually changed
if newly_transcribed > 0 or newly_embedded > 0:
st.rerun()
# --- Trigger background processing if conditions are met ---
# Check if gaze data and audio are loaded, OpenAI client ready, Supabase ready
should_process_base = (
st.session_state.gaze_df is not None and
st.session_state.full_audio is not None and
any(st.session_state.gaze_segments.values()) and # Check if peak or valley list is not empty
st.session_state.openai_client is not None and
supabase is not None # Pinecone (idx) is checked inside trigger function
)
# Check if there are actually any segments pending transcription or embedding
pending_transcription = False
pending_embedding = False
if should_process_base:
can_embed_check = idx is not None
for seg_type in ['peak', 'valley']:
for s in st.session_state.gaze_segments.get(seg_type, []):
uid = s.get('seg_uid')
if uid is None: continue
transcript_state = st.session_state.transcripts.get(uid)
if transcript_state is None:
pending_transcription = True
# Check for embedding if transcript exists (not None, not "") and not embedded
if can_embed_check and transcript_state and uid not in st.session_state.embedded_ids:
pending_embedding = True
# Break early if both found
if pending_transcription and pending_embedding: break
if pending_transcription and pending_embedding: break
# Combine checks
should_trigger = should_process_base and (pending_transcription or pending_embedding)
if should_trigger:
# Use a flag to prevent triggering multiple times during quick reruns
if not st.session_state.get("_processing_triggered", False):
st.session_state._processing_triggered = True
# Use st.spinner for immediate feedback while triggering
with st.spinner("Queueing background processing..."):
trigger_background_processing()
# Flag is reset inside trigger_background_processing after completion/error
# else: Process already running or just finished/failed and flag not reset yet
# else:
# Optional: Reset the trigger flag if conditions are no longer met or processing isn't needed
# This can prevent the flag getting stuck if e.g. audio is removed
# if not should_trigger and "_processing_triggered" in st.session_state:
# del st.session_state._processing_triggered
# --- Display Components ---
# Plot Panel
with plot_expander_placeholder:
with st.expander("Gaze Analysis Plot", expanded=True):
if st.session_state.gaze_df is not None and not st.session_state.gaze_df.empty:
fig = create_plotly_gaze_plot(st.session_state.gaze_df)
if fig:
st.plotly_chart(fig, use_container_width=True)
if st.checkbox("Show Gaze Data Table", key="show_gaze_table_plot", value=False):
st.dataframe(st.session_state.gaze_df, use_container_width=True, height=200)
else: st.warning("Could not generate gaze plot (check data).") # Changed to warning
elif st.session_state.get("data_loaded"): # Check if loading finished
st.info("No gaze data loaded or processed for this session.")
else:
st.info("Upload gaze CSV data or load a session with gaze data to view the plot.")
# Chat Column
with col_chat:
st.subheader("Chat with the transcript πŸ€–")
CHAT_HISTORY_MAX_HEIGHT = 450 # Adjust as needed
# Determine chat readiness
can_chat_infra = (st.session_state.openai_client is not None and
idx is not None and # Pinecone needed for retrieval
st.session_state.session_id is not None)
has_embeddings = bool(st.session_state.get("embedded_ids"))
has_segments = any(bool(s_list) for s_list in st.session_state.gaze_segments.values()) # Check if any segments exist
# Display status messages if not ready to chat
if not can_chat_infra:
missing_infra = []
if not st.session_state.session_id: missing_infra.append("Confirmed Session")
if not st.session_state.openai_client: missing_infra.append("OpenAI Key/Client")
if not idx: missing_infra.append("Pinecone Client")
st.info(f"Chat requires: {', '.join(missing_infra)} (See Sidebar Status).")
elif not has_segments:
st.info("Load gaze data to find segments for transcription and chat.")
elif not has_embeddings:
st.info("Segments found, but none embedded yet for chat context. Processing runs automatically if gaze/audio loaded & keys valid.")
# Show processing progress here
total_segs = sum(len(v) for v in st.session_state.gaze_segments.values())
transcribed_count = len([t for t in st.session_state.transcripts.values() if t is not None]) # Count non-pending (includes "" for failed/silent)
embedded_count = len(st.session_state.embedded_ids)
if total_segs > 0:
st.caption(f"{transcribed_count}/{total_segs} segments processed for transcription. {embedded_count}/{total_segs} embedded.")
if st.session_state.get("_processing_triggered"):
st.caption("⏳ Processing is currently running...")
# --- Chat Interface ---
chat_enabled = can_chat_infra and has_embeddings # Enable only if infra OK AND embeddings exist
if chat_enabled:
# Chat history container
chat_container = st.container(height=CHAT_HISTORY_MAX_HEIGHT)
with chat_container:
for role, msg in st.session_state.chat_history:
with st.chat_message(role):
st.markdown(msg)
# Chat input below the history
user_q = st.chat_input("Ask about transcribed segments...", disabled=not chat_enabled) # Disable if not ready
if user_q:
# Add user message to history and display immediately
st.session_state.chat_history.append(("user", user_q))
with chat_container: # Re-draw user message inside container
with st.chat_message("user"):
st.markdown(user_q)
# Process the query
with st.spinner("Thinking..."):
# 1. Retrieve relevant context from Pinecone
context_chunks = retrieve_relevant_transcripts(user_q, k=K_NEIGHBORS)
if not context_chunks or "_Vector search unavailable._" in context_chunks[0] or "_No relevant transcript segments found_" in context_chunks[0] or "_Could not generate embedding_" in context_chunks[0] or "_Error during vector search_" in context_chunks[0]:
context_str = "\n\n_(Could not retrieve relevant context. Answering based on general knowledge if possible, or state inability to answer.)_"
st.warning(f"Chat Context Retrieval Note: {context_chunks[0] if context_chunks else 'No context found.'}")
else:
context_str = "\n\n---\n\n".join(context_chunks)
# 2. Prepare prompt for LLM
sys_msg = ("You are an assistant analyzing transcripts from gaze/audio data segments. "
"Answer the user's question based *primarily* on the following provided transcript excerpts. "
"When using information from an excerpt, cite its time range (e.g., [10.50–15.20s]). "
"If the answer cannot be found in the excerpts, state that clearly. Do not invent details about the segments. "
"Be concise.")
user_msg_with_context = f"User Question: {user_q}\n\nRelevant Excerpts:\n{context_str}"
# 3. Call OpenAI Chat API
answer = "Sorry, I encountered an error trying to generate a response." # Default error
try:
response = st.session_state.openai_client.chat.completions.create(
model="gpt-4o-mini", # Use a cost-effective model
messages=[
{"role": "system", "content": sys_msg},
{"role": "user", "content": user_msg_with_context}
],
temperature=0.1, # More factual
max_tokens=250, # Limit response length
)
if response.choices and response.choices[0].message:
answer = response.choices[0].message.content.strip()
else:
answer = "_Received an empty response from the AI._"
except AuthenticationError:
answer = "_OpenAI Authentication Error. Check your API key._"
except APIError as e:
answer = f"_OpenAI API Error: {e}_"
except Exception as e:
answer = f"_An unexpected error occurred during chat generation: {e}_"
# Add assistant response to history
st.session_state.chat_history.append(("assistant", answer))
# Rerun to display the assistant's response in the chat container
st.rerun()
# Segment Details Column
with col_segments:
st.subheader("Explore Segments")
# --- Segment Selection Logic ---
peak_segments = st.session_state.gaze_segments.get('peak', [])
valley_segments = st.session_state.gaze_segments.get('valley', [])
has_peaks = bool(peak_segments)
has_valleys = bool(valley_segments)
if not has_peaks and not has_valleys:
if st.session_state.get("data_loaded"): # Check if loading finished
st.info("No gaze segments found or loaded for this session.")
else:
st.info("Load Gaze CSV or a session with gaze data to find segments.")
else:
# Create options for dropdowns: {Label: KeyTuple}
# Ensure sorting by index for consistent order
peak_segments_sorted = sorted(peak_segments, key=lambda s: s['index'])
valley_segments_sorted = sorted(valley_segments, key=lambda s: s['index'])
peak_options = {"-- Select Peak --": None}
peak_options.update({
f"P{s['index']}: {s['start_time']:.2f} – {s['end_time']:.2f}s": ('peak', s['index'])
for s in peak_segments_sorted
})
valley_options = {"-- Select Valley --": None}
valley_options.update({
f"V{s['index']}: {s['start_time']:.2f} – {s['end_time']:.2f}s": ('valley', s['index'])
for s in valley_segments_sorted
})
# Reverse lookup: {KeyTuple: Label} - needed for setting selectbox index
peak_keys_to_labels = {v: k for k, v in peak_options.items()}
valley_keys_to_labels = {v: k for k, v in valley_options.items()}
# Callback to handle selection changes and clear the *other* selector
def update_active_segment(selector_type):
if selector_type == 'peak':
label = st.session_state.peak_selector_widget # Get selected label
selected_key = peak_options.get(label)
st.session_state.active_segment_key = selected_key
# If a peak is selected, reset the valley selector's stored key
if selected_key is not None:
st.session_state.valley_selector_key = None # Reset internal state tracker
# We need to trigger the valley widget to reset visually in the next run
elif selector_type == 'valley':
label = st.session_state.valley_selector_widget
selected_key = valley_options.get(label)
st.session_state.active_segment_key = selected_key
# If a valley is selected, reset the peak selector's stored key
if selected_key is not None:
st.session_state.peak_selector_key = None
# Trigger peak widget reset visually
# Get current active segment key to find the label
active_key = st.session_state.active_segment_key
active_label = ""
active_type = None
if active_key:
active_type = active_key[0]
if active_type == 'peak':
active_label = peak_keys_to_labels.get(active_key, "")
elif active_type == 'valley':
active_label = valley_keys_to_labels.get(active_key, "")
# Determine default index for each selectbox
try:
peak_default_index = list(peak_options.keys()).index(active_label) if active_type == 'peak' else 0
except ValueError: peak_default_index = 0
try:
valley_default_index = list(valley_options.keys()).index(active_label) if active_type == 'valley' else 0
except ValueError: valley_default_index = 0
tab_peak, tab_valley = st.tabs(["Peaks", "Valleys"])
with tab_peak:
st.selectbox(
"Select Peak Segment:",
options=list(peak_options.keys()), # Pass labels as options
index=peak_default_index,
key="peak_selector_widget", # Use unique key for widget state
on_change=update_active_segment,
args=('peak',), # Pass selector type to callback
disabled=not has_peaks,
label_visibility="collapsed" # Hide redundant label
)
with tab_valley:
st.selectbox(
"Select Valley Segment:",
options=list(valley_options.keys()),
index=valley_default_index,
key="valley_selector_widget",
on_change=update_active_segment,
args=('valley',),
disabled=not has_valleys,
label_visibility="collapsed"
)
# --- Display Details of Active Segment ---
st.divider()
active_key = st.session_state.active_segment_key # Re-read after potential callback change
if active_key:
sel_type, sel_index = active_key
# Find the segment data using the type and index
segment_list = st.session_state.gaze_segments.get(sel_type, [])
# Find the segment based on the index stored within the segment dict
sel_seg = next((s for s in segment_list if s.get('index') == sel_index), None)
if sel_seg:
start_t, end_t = sel_seg['start_time'], sel_seg['end_time']
uid = sel_seg.get('seg_uid', 'N/A') # Use .get for safety
export_format = "wav"; mime_type = f"audio/{export_format}"
st.markdown(f"#### Details: {sel_type.capitalize()} {sel_index}")
st.markdown(f"**Time:** `{start_t:.2f}s – {end_t:.2f}s` | **UID:** `{uid}`")
# 1. Audio Player
if st.session_state.full_audio:
# Use the cached extraction function
seg_bytes, err_audio = extract_audio_segment_bytes(st.session_state.full_audio, start_t, end_t, format=export_format)
if seg_bytes:
st.audio(seg_bytes, format=mime_type)
elif err_audio:
st.caption(f"πŸ”‡ Audio Error: {err_audio}")
# else: No bytes extracted, likely valid short segment or error handled by err_audio
elif st.session_state.audio_load_error:
st.caption(f"πŸ”‡ Audio Error: {st.session_state.audio_load_error}")
else: st.caption("πŸ”‡ Full audio not loaded.")
# 2. GIF Viewer
with st.expander("Screen Recording (GIF)", expanded=False):
if st.session_state.gif_lookup is not None:
# Construct the key used in process_screenshot_zip_cached
lookup_key = f"{sel_type}_{start_t:.2f}_{end_t:.2f}"
gif_bytes = st.session_state.gif_lookup.get(lookup_key)
if gif_bytes:
st.image(gif_bytes, caption=f"Animation: {lookup_key}", use_container_width=True)
else: st.caption("πŸ–ΌοΈ _No matching GIF found for this time range/key._")
else: st.caption("πŸ–ΌοΈ Screenshots ZIP not loaded or processed.")
# 3. Transcription Display
st.markdown("**Transcript:**")
transcript_status = st.empty() # Placeholder for status/text
transcript_text = st.session_state.transcripts.get(uid) # None (pending), "" (failed/empty), str (success)
if transcript_text is None:
# If processing is running, show generic pending
if st.session_state.get("_processing_triggered"):
transcript_status.caption("⏳ _Processing in progress..._")
# If not running, check prereqs
elif not st.session_state.openai_client:
transcript_status.caption("🚫 _Transcription requires OpenAI key._")
elif not st.session_state.full_audio:
transcript_status.caption("🚫 _Transcription requires audio file._")
elif should_process_base: # Base conditions met, but not triggered yet
transcript_status.caption("⏳ _Transcription pending... (will run automatically)_")
else: # Some other base condition not met
transcript_status.caption("🚫 _Prerequisites for transcription not met._")
elif transcript_text == "":
transcript_status.caption("πŸ”‡ _Transcription resulted in empty text (segment likely silent or error during processing)._")
else:
transcript_status.markdown(f"```{transcript_text}```") # Use markdown for better formatting
# Display embedding status below transcript
if uid != 'N/A': # Only show if UID is valid
if uid in st.session_state.embedded_ids:
st.caption("βœ… Embedded")
elif transcript_text: # Only show pending if transcript exists
if st.session_state.get("_processing_triggered"):
st.caption("⏳ _Embedding in progress..._")
elif st.session_state.openai_client and idx:
st.caption("⏳ _Embedding pending... (will run automatically)_")
elif not idx:
st.caption("🚫 _Embedding requires Pinecone client._")
else: # OpenAI client missing but idx present
st.caption("🚫 _Embedding requires OpenAI client._")
# else: No transcript text, so no embedding status needed
else:
st.warning(f"Could not find details for the selected segment ({sel_type}, index {sel_index}). State might be inconsistent.")
else:
st.info("Select a peak or valley segment from the tabs above to view its details.")
# --- Footer / Debug ---
# st.divider()
# with st.expander("Debug Info (Session State)"):
# st.json(st.session_state.to_dict(), expanded=False)