Spaces:
				
			
			
	
			
			
					
		Running
		
			on 
			
			CPU Upgrade
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
			on 
			
			CPU Upgrade
	| from flask import Blueprint, render_template, current_app, jsonify, request, redirect, url_for, flash | |
| from models import ( | |
| db, User, Model, Vote, EloHistory, ModelType, | |
| CoordinatedVotingCampaign, CampaignParticipant, UserTimeout, | |
| get_user_timeouts, get_coordinated_campaigns, resolve_campaign, | |
| create_user_timeout, cancel_user_timeout, check_user_timeout | |
| ) | |
| from auth import admin_required | |
| from security import check_user_security_score | |
| from sqlalchemy import func, desc, extract, text | |
| from datetime import datetime, timedelta | |
| import json | |
| import os | |
| from sqlalchemy import or_ | |
| admin = Blueprint("admin", __name__, url_prefix="/admin") | |
| def index(): | |
| """Admin dashboard homepage""" | |
| # Get count statistics | |
| stats = { | |
| "total_users": User.query.count(), | |
| "total_votes": Vote.query.count(), | |
| "tts_votes": Vote.query.filter_by(model_type=ModelType.TTS).count(), | |
| "conversational_votes": Vote.query.filter_by(model_type=ModelType.CONVERSATIONAL).count(), | |
| "tts_models": Model.query.filter_by(model_type=ModelType.TTS).count(), | |
| "conversational_models": Model.query.filter_by(model_type=ModelType.CONVERSATIONAL).count(), | |
| } | |
| # Get recent votes | |
| recent_votes = Vote.query.order_by(Vote.vote_date.desc()).limit(10).all() | |
| # Get recent users | |
| recent_users = User.query.order_by(User.join_date.desc()).limit(10).all() | |
| # Get daily votes for the past 30 days | |
| thirty_days_ago = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=30) | |
| daily_votes = db.session.query( | |
| func.date(Vote.vote_date).label('date'), | |
| func.count().label('count') | |
| ).filter(Vote.vote_date >= thirty_days_ago).group_by( | |
| func.date(Vote.vote_date) | |
| ).order_by(func.date(Vote.vote_date)).all() | |
| # Generate a complete list of dates for the past 30 days | |
| date_list = [] | |
| current_date = datetime.utcnow() | |
| for i in range(30, -1, -1): | |
| date_list.append((current_date - timedelta(days=i)).date()) | |
| # Create a dictionary with actual vote counts | |
| vote_counts = {day.date: day.count for day in daily_votes} | |
| # Build complete datasets including days with zero votes | |
| formatted_dates = [date.strftime("%Y-%m-%d") for date in date_list] | |
| vote_counts_list = [vote_counts.get(date, 0) for date in date_list] | |
| daily_votes_data = { | |
| "labels": formatted_dates, | |
| "counts": vote_counts_list | |
| } | |
| # Get top models | |
| top_tts_models = Model.query.filter_by( | |
| model_type=ModelType.TTS | |
| ).order_by(Model.current_elo.desc()).limit(5).all() | |
| top_conversational_models = Model.query.filter_by( | |
| model_type=ModelType.CONVERSATIONAL | |
| ).order_by(Model.current_elo.desc()).limit(5).all() | |
| return render_template( | |
| "admin/index.html", | |
| stats=stats, | |
| recent_votes=recent_votes, | |
| recent_users=recent_users, | |
| daily_votes_data=json.dumps(daily_votes_data), | |
| top_tts_models=top_tts_models, | |
| top_conversational_models=top_conversational_models | |
| ) | |
| def models(): | |
| """Manage models""" | |
| tts_models = Model.query.filter_by(model_type=ModelType.TTS).order_by(Model.name).all() | |
| conversational_models = Model.query.filter_by(model_type=ModelType.CONVERSATIONAL).order_by(Model.name).all() | |
| return render_template( | |
| "admin/models.html", | |
| tts_models=tts_models, | |
| conversational_models=conversational_models | |
| ) | |
| def edit_model(model_id): | |
| """Edit a model""" | |
| model = Model.query.get_or_404(model_id) | |
| if request.method == "POST": | |
| model.name = request.form.get("name") | |
| model.is_active = "is_active" in request.form | |
| model.is_open = "is_open" in request.form | |
| model.model_url = request.form.get("model_url") | |
| db.session.commit() | |
| flash(f"Model '{model.name}' updated successfully", "success") | |
| return redirect(url_for("admin.models")) | |
| return render_template("admin/edit_model.html", model=model) | |
| def users(): | |
| """Manage users""" | |
| users = User.query.order_by(User.username).all() | |
| admin_users = os.getenv("ADMIN_USERS", "").split(",") | |
| admin_users = [username.strip() for username in admin_users] | |
| # Calculate security scores for all users | |
| users_with_scores = [] | |
| for user in users: | |
| score, factors = check_user_security_score(user.id) | |
| users_with_scores.append({ | |
| 'user': user, | |
| 'security_score': score, | |
| 'security_factors': factors | |
| }) | |
| # Sort by security score (lowest first to highlight problematic users) | |
| users_with_scores.sort(key=lambda x: x['security_score']) | |
| return render_template("admin/users.html", users_with_scores=users_with_scores, admin_users=admin_users) | |
| def user_detail(user_id): | |
| """View user details""" | |
| user = User.query.get_or_404(user_id) | |
| # Get security score and factors | |
| security_score, security_factors = check_user_security_score(user_id) | |
| # Get user votes | |
| recent_votes = Vote.query.filter_by(user_id=user_id).order_by(Vote.vote_date.desc()).limit(20).all() | |
| # Get vote statistics | |
| tts_votes = Vote.query.filter_by(user_id=user_id, model_type=ModelType.TTS).count() | |
| conversational_votes = Vote.query.filter_by(user_id=user_id, model_type=ModelType.CONVERSATIONAL).count() | |
| # Get comprehensive model bias analysis | |
| # This counts how often each model was chosen vs how often it appeared | |
| model_bias_analysis = [] | |
| # Get all votes by this user | |
| user_votes = Vote.query.filter_by(user_id=user_id).all() | |
| if user_votes: | |
| model_stats = {} | |
| for vote in user_votes: | |
| # Track model_chosen | |
| chosen_id = vote.model_chosen | |
| rejected_id = vote.model_rejected | |
| # Initialize model stats if not exists | |
| if chosen_id not in model_stats: | |
| model_stats[chosen_id] = {'chosen': 0, 'appeared': 0, 'name': None} | |
| if rejected_id not in model_stats: | |
| model_stats[rejected_id] = {'chosen': 0, 'appeared': 0, 'name': None} | |
| # Count appearances and choices | |
| model_stats[chosen_id]['chosen'] += 1 | |
| model_stats[chosen_id]['appeared'] += 1 | |
| model_stats[rejected_id]['appeared'] += 1 | |
| # Get model names and calculate bias ratios | |
| for model_id, stats in model_stats.items(): | |
| model = Model.query.get(model_id) | |
| if model: | |
| stats['name'] = model.name | |
| stats['bias_ratio'] = stats['chosen'] / stats['appeared'] if stats['appeared'] > 0 else 0 | |
| stats['model_id'] = model_id | |
| # Sort by bias ratio (highest bias first) and take top 5 | |
| model_bias_analysis = sorted( | |
| [stats for stats in model_stats.values() if stats['name'] is not None], | |
| key=lambda x: x['bias_ratio'], | |
| reverse=True | |
| )[:5] | |
| return render_template( | |
| "admin/user_detail.html", | |
| user=user, | |
| security_score=security_score, | |
| security_factors=security_factors, | |
| recent_votes=recent_votes, | |
| tts_votes=tts_votes, | |
| conversational_votes=conversational_votes, | |
| model_bias_analysis=model_bias_analysis, | |
| total_votes=tts_votes + conversational_votes | |
| ) | |
| def votes(): | |
| """View recent votes""" | |
| page = request.args.get('page', 1, type=int) | |
| per_page = 50 | |
| # Get votes with pagination | |
| votes_pagination = Vote.query.order_by( | |
| Vote.vote_date.desc() | |
| ).paginate(page=page, per_page=per_page) | |
| return render_template( | |
| "admin/votes.html", | |
| votes=votes_pagination.items, | |
| pagination=votes_pagination | |
| ) | |
| def statistics(): | |
| """View detailed statistics""" | |
| # Get daily votes for the past 30 days by model type | |
| thirty_days_ago = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=30) | |
| tts_daily_votes = db.session.query( | |
| func.date(Vote.vote_date).label('date'), | |
| func.count().label('count') | |
| ).filter( | |
| Vote.vote_date >= thirty_days_ago, | |
| Vote.model_type == ModelType.TTS | |
| ).group_by( | |
| func.date(Vote.vote_date) | |
| ).order_by(func.date(Vote.vote_date)).all() | |
| conv_daily_votes = db.session.query( | |
| func.date(Vote.vote_date).label('date'), | |
| func.count().label('count') | |
| ).filter( | |
| Vote.vote_date >= thirty_days_ago, | |
| Vote.model_type == ModelType.CONVERSATIONAL | |
| ).group_by( | |
| func.date(Vote.vote_date) | |
| ).order_by(func.date(Vote.vote_date)).all() | |
| # Monthly new users | |
| monthly_users = db.session.query( | |
| extract('year', User.join_date).label('year'), | |
| extract('month', User.join_date).label('month'), | |
| func.count().label('count') | |
| ).group_by( | |
| 'year', 'month' | |
| ).order_by('year', 'month').all() | |
| # Generate a complete list of dates for the past 30 days | |
| date_list = [] | |
| current_date = datetime.utcnow() | |
| for i in range(30, -1, -1): | |
| date_list.append((current_date - timedelta(days=i)).date()) | |
| # Create dictionaries with actual vote counts | |
| tts_vote_counts = {day.date: day.count for day in tts_daily_votes} | |
| conv_vote_counts = {day.date: day.count for day in conv_daily_votes} | |
| # Format dates consistently for charts | |
| formatted_dates = [date.strftime("%Y-%m-%d") for date in date_list] | |
| # Build complete datasets including days with zero votes | |
| tts_counts = [tts_vote_counts.get(date, 0) for date in date_list] | |
| conv_counts = [conv_vote_counts.get(date, 0) for date in date_list] | |
| # Generate all month/year combinations for the past 12 months | |
| current_date = datetime.utcnow() | |
| month_list = [] | |
| for i in range(11, -1, -1): | |
| past_date = current_date - timedelta(days=i*30) # Approximate | |
| month_list.append((past_date.year, past_date.month)) | |
| # Create a dictionary with actual user counts | |
| user_counts = {(record.year, record.month): record.count for record in monthly_users} | |
| # Build complete monthly datasets including months with zero new users | |
| monthly_labels = [f"{month}/{year}" for year, month in month_list] | |
| monthly_counts = [user_counts.get((year, month), 0) for year, month in month_list] | |
| # Model performance over time | |
| top_models = Model.query.order_by(Model.match_count.desc()).limit(5).all() | |
| # Get first and last timestamp to create a consistent timeline | |
| earliest = datetime.utcnow() - timedelta(days=30) # Default to 30 days ago | |
| latest = datetime.utcnow() # Default to now | |
| # Find actual earliest and latest timestamps across all models | |
| has_elo_history = False | |
| for model in top_models: | |
| first = EloHistory.query.filter_by(model_id=model.id).order_by(EloHistory.timestamp).first() | |
| last = EloHistory.query.filter_by(model_id=model.id).order_by(EloHistory.timestamp.desc()).first() | |
| if first and last: | |
| has_elo_history = True | |
| if first.timestamp < earliest: | |
| earliest = first.timestamp | |
| if last.timestamp > latest: | |
| latest = last.timestamp | |
| # If no history was found, use a default range of the last 30 days | |
| if not has_elo_history: | |
| earliest = datetime.utcnow() - timedelta(days=30) | |
| latest = datetime.utcnow() | |
| # Make sure the date range is valid (earliest before latest) | |
| if earliest > latest: | |
| earliest = latest - timedelta(days=30) | |
| # Generate a list of dates for the ELO history timeline | |
| # Using 1-day intervals for a smoother chart | |
| elo_dates = [] | |
| current = earliest | |
| while current <= latest: | |
| elo_dates.append(current.date()) | |
| current += timedelta(days=1) | |
| # Format dates consistently | |
| formatted_elo_dates = [date.strftime("%Y-%m-%d") for date in elo_dates] | |
| model_history = {} | |
| # Initialize empty data for all top models | |
| for model in top_models: | |
| model_history[model.name] = { | |
| "timestamps": formatted_elo_dates, | |
| "scores": [None] * len(formatted_elo_dates) # Initialize with None values | |
| } | |
| history = EloHistory.query.filter_by( | |
| model_id=model.id | |
| ).order_by(EloHistory.timestamp).all() | |
| if history: | |
| # Create a dictionary mapping dates to scores | |
| history_dict = {} | |
| for h in history: | |
| date_key = h.timestamp.date().strftime("%Y-%m-%d") | |
| history_dict[date_key] = h.elo_score | |
| # Fill in missing dates with the previous score | |
| last_score = model.current_elo # Default to current ELO if no history | |
| scores = [] | |
| for date in formatted_elo_dates: | |
| if date in history_dict: | |
| last_score = history_dict[date] | |
| scores.append(last_score) | |
| model_history[model.name]["scores"] = scores | |
| else: | |
| # If no history, use the current Elo for all dates | |
| model_history[model.name]["scores"] = [model.current_elo] * len(formatted_elo_dates) | |
| chart_data = { | |
| "dailyVotes": { | |
| "labels": formatted_dates, | |
| "ttsCounts": tts_counts, | |
| "convCounts": conv_counts | |
| }, | |
| "monthlyUsers": { | |
| "labels": monthly_labels, | |
| "counts": monthly_counts | |
| }, | |
| "modelHistory": model_history | |
| } | |
| return render_template( | |
| "admin/statistics.html", | |
| chart_data=json.dumps(chart_data) | |
| ) | |
| def activity(): | |
| """View recent text generations""" | |
| # Check if we have any active sessions from app.py | |
| tts_session_count = 0 | |
| conversational_session_count = 0 | |
| # Access global variables from app.py through current_app | |
| if hasattr(current_app, 'tts_sessions'): | |
| tts_session_count = len(current_app.tts_sessions) | |
| else: # Try to access through app module | |
| from app import tts_sessions | |
| tts_session_count = len(tts_sessions) | |
| if hasattr(current_app, 'conversational_sessions'): | |
| conversational_session_count = len(current_app.conversational_sessions) | |
| else: # Try to access through app module | |
| from app import conversational_sessions | |
| conversational_session_count = len(conversational_sessions) | |
| # Get recent votes which represent completed generations | |
| recent_tts_votes = Vote.query.filter_by( | |
| model_type=ModelType.TTS | |
| ).order_by(Vote.vote_date.desc()).limit(20).all() | |
| recent_conv_votes = Vote.query.filter_by( | |
| model_type=ModelType.CONVERSATIONAL | |
| ).order_by(Vote.vote_date.desc()).limit(20).all() | |
| # Get votes per hour for the last 24 hours | |
| current_time = datetime.utcnow() | |
| last_24h = current_time.replace(minute=0, second=0, microsecond=0) - timedelta(hours=24) | |
| # Use SQLite-compatible date formatting | |
| hourly_votes = db.session.query( | |
| func.strftime('%Y-%m-%d %H:00', Vote.vote_date).label('hour'), | |
| func.count().label('count') | |
| ).filter( | |
| Vote.vote_date >= last_24h | |
| ).group_by('hour').order_by('hour').all() | |
| # Generate all hours for the past 24 hours with correct hour formatting | |
| hour_list = [] | |
| for i in range(24, -1, -1): | |
| # Calculate the hour time and truncate to hour | |
| hour_time = current_time - timedelta(hours=i) | |
| hour_time = hour_time.replace(minute=0, second=0, microsecond=0) | |
| hour_list.append(hour_time.strftime('%Y-%m-%d %H:00')) | |
| # Create a dictionary with actual vote counts | |
| vote_counts = {hour.hour: hour.count for hour in hourly_votes} | |
| # Build complete hourly datasets including hours with zero votes | |
| hourly_data = { | |
| "labels": hour_list, | |
| "counts": [vote_counts.get(hour, 0) for hour in hour_list] | |
| } | |
| return render_template( | |
| "admin/activity.html", | |
| tts_session_count=tts_session_count, | |
| conversational_session_count=conversational_session_count, | |
| recent_tts_votes=recent_tts_votes, | |
| recent_conv_votes=recent_conv_votes, | |
| hourly_data=json.dumps(hourly_data) | |
| ) | |
| def analytics(): | |
| """View analytics data including session duration, IP addresses, etc.""" | |
| # Get analytics statistics | |
| analytics_stats = {} | |
| try: | |
| # Session duration statistics | |
| duration_stats = db.session.execute(text(""" | |
| SELECT | |
| AVG(session_duration_seconds) as avg_duration, | |
| MIN(session_duration_seconds) as min_duration, | |
| MAX(session_duration_seconds) as max_duration, | |
| COUNT(session_duration_seconds) as total_with_duration | |
| FROM vote | |
| WHERE session_duration_seconds IS NOT NULL | |
| """)).fetchone() | |
| analytics_stats['duration'] = { | |
| 'avg': round(duration_stats.avg_duration, 2) if duration_stats.avg_duration else 0, | |
| 'min': round(duration_stats.min_duration, 2) if duration_stats.min_duration else 0, | |
| 'max': round(duration_stats.max_duration, 2) if duration_stats.max_duration else 0, | |
| 'total': duration_stats.total_with_duration or 0 | |
| } | |
| # Cache hit statistics | |
| cache_stats = db.session.execute(text(""" | |
| SELECT | |
| cache_hit, | |
| COUNT(*) as count | |
| FROM vote | |
| WHERE cache_hit IS NOT NULL | |
| GROUP BY cache_hit | |
| """)).fetchall() | |
| analytics_stats['cache'] = { | |
| 'hits': 0, | |
| 'misses': 0, | |
| 'total': 0 | |
| } | |
| for stat in cache_stats: | |
| if stat.cache_hit: | |
| analytics_stats['cache']['hits'] = stat.count | |
| else: | |
| analytics_stats['cache']['misses'] = stat.count | |
| analytics_stats['cache']['total'] += stat.count | |
| # Top IP address regions (anonymized) | |
| ip_stats = db.session.execute(text(""" | |
| SELECT | |
| ip_address_partial, | |
| COUNT(*) as count | |
| FROM vote | |
| WHERE ip_address_partial IS NOT NULL | |
| GROUP BY ip_address_partial | |
| ORDER BY count DESC | |
| LIMIT 10 | |
| """)).fetchall() | |
| analytics_stats['top_ips'] = [ | |
| {'ip': stat.ip_address_partial, 'count': stat.count} | |
| for stat in ip_stats | |
| ] | |
| # User agent statistics (top browsers/devices) | |
| ua_stats = db.session.execute(text(""" | |
| SELECT | |
| CASE | |
| WHEN user_agent LIKE '%Chrome%' THEN 'Chrome' | |
| WHEN user_agent LIKE '%Firefox%' THEN 'Firefox' | |
| WHEN user_agent LIKE '%Safari%' AND user_agent NOT LIKE '%Chrome%' THEN 'Safari' | |
| WHEN user_agent LIKE '%Edge%' THEN 'Edge' | |
| WHEN user_agent LIKE '%Mobile%' OR user_agent LIKE '%Android%' THEN 'Mobile' | |
| ELSE 'Other' | |
| END as browser, | |
| COUNT(*) as count | |
| FROM vote | |
| WHERE user_agent IS NOT NULL | |
| GROUP BY browser | |
| ORDER BY count DESC | |
| """)).fetchall() | |
| analytics_stats['browsers'] = [ | |
| {'browser': stat.browser, 'count': stat.count} | |
| for stat in ua_stats | |
| ] | |
| # Recent votes with analytics data | |
| recent_analytics = db.session.execute(text(""" | |
| SELECT | |
| v.id, | |
| v.vote_date, | |
| v.session_duration_seconds, | |
| v.ip_address_partial, | |
| v.cache_hit, | |
| v.model_type, | |
| u.username, | |
| m1.name as chosen_model, | |
| m2.name as rejected_model | |
| FROM vote v | |
| LEFT JOIN user u ON v.user_id = u.id | |
| LEFT JOIN model m1 ON v.model_chosen = m1.id | |
| LEFT JOIN model m2 ON v.model_rejected = m2.id | |
| WHERE v.session_duration_seconds IS NOT NULL | |
| ORDER BY v.vote_date DESC | |
| LIMIT 20 | |
| """)).fetchall() | |
| analytics_stats['recent_votes'] = [ | |
| { | |
| 'id': vote.id, | |
| 'vote_date': vote.vote_date, | |
| 'duration': round(vote.session_duration_seconds, 2) if vote.session_duration_seconds else None, | |
| 'ip': vote.ip_address_partial, | |
| 'cache_hit': vote.cache_hit, | |
| 'model_type': vote.model_type, | |
| 'username': vote.username, | |
| 'chosen_model': vote.chosen_model, | |
| 'rejected_model': vote.rejected_model | |
| } | |
| for vote in recent_analytics | |
| ] | |
| except Exception as e: | |
| flash(f"Error retrieving analytics data: {str(e)}", "error") | |
| analytics_stats = {} | |
| return render_template( | |
| "admin/analytics.html", | |
| analytics_stats=analytics_stats | |
| ) | |
| def security(): | |
| """View security monitoring data and suspicious activity.""" | |
| try: | |
| from security import ( | |
| detect_suspicious_voting_patterns, | |
| detect_coordinated_voting, | |
| check_user_security_score, | |
| detect_model_bias | |
| ) | |
| # Get recent suspicious users | |
| recent_users = User.query.order_by(User.join_date.desc()).limit(50).all() | |
| suspicious_users = [] | |
| for user in recent_users: | |
| score, factors = check_user_security_score(user.id) | |
| if score < 50: # Flag users with low security scores | |
| suspicious_users.append({ | |
| 'user': user, | |
| 'score': score, | |
| 'factors': factors | |
| }) | |
| # Sort by lowest score first | |
| suspicious_users.sort(key=lambda x: x['score']) | |
| # Check for coordinated voting on top models | |
| top_models = Model.query.order_by(Model.current_elo.desc()).limit(10).all() | |
| coordinated_campaigns = [] | |
| for model in top_models: | |
| is_coordinated, user_count, vote_count, suspicious_users_list = detect_coordinated_voting(model.id) | |
| if is_coordinated: | |
| coordinated_campaigns.append({ | |
| 'model': model, | |
| 'user_count': user_count, | |
| 'vote_count': vote_count, | |
| 'suspicious_users': suspicious_users_list | |
| }) | |
| # Get users with high model bias | |
| biased_users = [] | |
| for model in top_models: | |
| # Check recent voters for this model | |
| recent_voters = db.session.query(Vote.user_id).filter( | |
| Vote.model_chosen == model.id | |
| ).distinct().limit(20).all() | |
| for voter in recent_voters: | |
| if voter.user_id: | |
| is_biased, bias_ratio, votes_for_model, total_votes = detect_model_bias( | |
| voter.user_id, model.id | |
| ) | |
| if is_biased and total_votes >= 5: | |
| user = User.query.get(voter.user_id) | |
| if user: | |
| biased_users.append({ | |
| 'user': user, | |
| 'model': model, | |
| 'bias_ratio': bias_ratio, | |
| 'votes_for_model': votes_for_model, | |
| 'total_votes': total_votes | |
| }) | |
| # Remove duplicates and sort by bias ratio | |
| seen_users = set() | |
| unique_biased_users = [] | |
| for item in biased_users: | |
| user_model_key = (item['user'].id, item['model'].id) | |
| if user_model_key not in seen_users: | |
| seen_users.add(user_model_key) | |
| unique_biased_users.append(item) | |
| unique_biased_users.sort(key=lambda x: x['bias_ratio'], reverse=True) | |
| # Get recent security blocks from logs (if available) | |
| security_blocks = [] | |
| try: | |
| # This would require parsing application logs | |
| # For now, we'll show a placeholder | |
| pass | |
| except Exception: | |
| pass | |
| return render_template( | |
| "admin/security.html", | |
| suspicious_users=suspicious_users[:20], # Limit to top 20 | |
| coordinated_campaigns=coordinated_campaigns, | |
| biased_users=unique_biased_users[:20], # Limit to top 20 | |
| security_blocks=security_blocks | |
| ) | |
| except ImportError: | |
| flash("Security module not available", "error") | |
| return redirect(url_for("admin.index")) | |
| except Exception as e: | |
| flash(f"Error loading security data: {str(e)}", "error") | |
| return redirect(url_for("admin.index")) | |
| def timeouts(): | |
| """Manage user timeouts""" | |
| # Get active timeouts | |
| active_timeouts = get_user_timeouts(active_only=True, limit=100) | |
| # Get recent expired/cancelled timeouts | |
| recent_inactive = UserTimeout.query.filter( | |
| or_( | |
| UserTimeout.is_active == False, | |
| UserTimeout.expires_at <= datetime.utcnow() | |
| ) | |
| ).order_by(UserTimeout.created_at.desc()).limit(50).all() | |
| # Get coordinated campaigns for context | |
| recent_campaigns = get_coordinated_campaigns(limit=20) | |
| return render_template( | |
| "admin/timeouts.html", | |
| active_timeouts=active_timeouts, | |
| recent_inactive=recent_inactive, | |
| recent_campaigns=recent_campaigns | |
| ) | |
| def create_timeout(): | |
| """Create a new user timeout""" | |
| try: | |
| user_id = request.form.get("user_id", type=int) | |
| reason = request.form.get("reason", "").strip() | |
| timeout_type = request.form.get("timeout_type", "manual") | |
| duration_days = request.form.get("duration_days", type=int) | |
| if not all([user_id, reason, duration_days]): | |
| flash("All fields are required", "error") | |
| return redirect(url_for("admin.timeouts")) | |
| if duration_days < 1 or duration_days > 365: | |
| flash("Duration must be between 1 and 365 days", "error") | |
| return redirect(url_for("admin.timeouts")) | |
| # Check if user exists | |
| user = User.query.get(user_id) | |
| if not user: | |
| flash("User not found", "error") | |
| return redirect(url_for("admin.timeouts")) | |
| # Check if user already has an active timeout | |
| is_timed_out, existing_timeout = check_user_timeout(user_id) | |
| if is_timed_out: | |
| flash(f"User {user.username} already has an active timeout until {existing_timeout.expires_at}", "error") | |
| return redirect(url_for("admin.timeouts")) | |
| # Create timeout | |
| from flask_login import current_user | |
| timeout = create_user_timeout( | |
| user_id=user_id, | |
| reason=reason, | |
| timeout_type=timeout_type, | |
| duration_days=duration_days, | |
| created_by=current_user.id if current_user.is_authenticated else None | |
| ) | |
| flash(f"Timeout created for {user.username} (expires: {timeout.expires_at})", "success") | |
| except Exception as e: | |
| flash(f"Error creating timeout: {str(e)}", "error") | |
| return redirect(url_for("admin.timeouts")) | |
| def cancel_timeout(timeout_id): | |
| """Cancel an active timeout""" | |
| try: | |
| cancel_reason = request.form.get("cancel_reason", "").strip() | |
| if not cancel_reason: | |
| flash("Cancel reason is required", "error") | |
| return redirect(url_for("admin.timeouts")) | |
| from flask_login import current_user | |
| success, message = cancel_user_timeout( | |
| timeout_id=timeout_id, | |
| cancelled_by=current_user.id if current_user.is_authenticated else None, | |
| cancel_reason=cancel_reason | |
| ) | |
| if success: | |
| flash(message, "success") | |
| else: | |
| flash(message, "error") | |
| except Exception as e: | |
| flash(f"Error cancelling timeout: {str(e)}", "error") | |
| return redirect(url_for("admin.timeouts")) | |
| def campaigns(): | |
| """View and manage coordinated voting campaigns""" | |
| status_filter = request.args.get("status", "all") | |
| if status_filter == "all": | |
| campaigns = get_coordinated_campaigns(limit=100) | |
| else: | |
| campaigns = get_coordinated_campaigns(status=status_filter, limit=100) | |
| # Get campaign statistics | |
| stats = { | |
| "total": CoordinatedVotingCampaign.query.count(), | |
| "active": CoordinatedVotingCampaign.query.filter_by(status="active").count(), | |
| "resolved": CoordinatedVotingCampaign.query.filter_by(status="resolved").count(), | |
| "false_positive": CoordinatedVotingCampaign.query.filter_by(status="false_positive").count(), | |
| } | |
| return render_template( | |
| "admin/campaigns.html", | |
| campaigns=campaigns, | |
| stats=stats, | |
| current_filter=status_filter | |
| ) | |
| def campaign_detail(campaign_id): | |
| """View detailed information about a coordinated voting campaign""" | |
| campaign = CoordinatedVotingCampaign.query.get_or_404(campaign_id) | |
| # Get participants with user details | |
| participants = db.session.query(CampaignParticipant, User).join( | |
| User, CampaignParticipant.user_id == User.id | |
| ).filter(CampaignParticipant.campaign_id == campaign_id).all() | |
| # Get related timeouts | |
| related_timeouts = UserTimeout.query.filter_by( | |
| related_campaign_id=campaign_id | |
| ).all() | |
| return render_template( | |
| "admin/campaign_detail.html", | |
| campaign=campaign, | |
| participants=participants, | |
| related_timeouts=related_timeouts | |
| ) | |
| def resolve_campaign_route(campaign_id): | |
| """Mark a campaign as resolved""" | |
| try: | |
| status = request.form.get("status") | |
| admin_notes = request.form.get("admin_notes", "").strip() | |
| if status not in ["resolved", "false_positive"]: | |
| flash("Invalid status", "error") | |
| return redirect(url_for("admin.campaign_detail", campaign_id=campaign_id)) | |
| from flask_login import current_user | |
| success, message = resolve_campaign( | |
| campaign_id=campaign_id, | |
| resolved_by=current_user.id if current_user.is_authenticated else None, | |
| status=status, | |
| admin_notes=admin_notes | |
| ) | |
| if success: | |
| flash(f"Campaign marked as {status}", "success") | |
| else: | |
| flash(message, "error") | |
| except Exception as e: | |
| flash(f"Error resolving campaign: {str(e)}", "error") | |
| return redirect(url_for("admin.campaign_detail", campaign_id=campaign_id)) | |
| def user_search(): | |
| """Search for users by username (for timeout creation)""" | |
| query = request.args.get("q", "").strip() | |
| if len(query) < 2: | |
| return jsonify([]) | |
| users = User.query.filter( | |
| User.username.ilike(f"%{query}%") | |
| ).limit(10).all() | |
| return jsonify([{ | |
| "id": user.id, | |
| "username": user.username, | |
| "join_date": user.join_date.strftime("%Y-%m-%d") if user.join_date else "N/A" | |
| } for user in users]) | 
