Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| # logging_utils.py | |
| # 日志相关工具 | |
| import os | |
| import json | |
| from datetime import datetime,timedelta | |
| from collections import defaultdict | |
| LOG_DIR = "/opt/nav-fronted/logs" | |
| ACCESS_LOG = os.path.join(LOG_DIR, "access.log") | |
| SUBMISSION_LOG = os.path.join(LOG_DIR, "submissions.log") | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| IP_REQUEST_RECORDS = defaultdict(list) | |
| IP_LIMIT = 5 | |
| def is_request_allowed(ip: str) -> bool: | |
| now = datetime.now() | |
| IP_REQUEST_RECORDS[ip] = [t for t in IP_REQUEST_RECORDS[ip] if now - t < timedelta(minutes=1)] | |
| if len(IP_REQUEST_RECORDS[ip]) < IP_LIMIT: | |
| IP_REQUEST_RECORDS[ip].append(now) | |
| return True | |
| return False | |
| def log_access(user_ip: str = None, user_agent: str = None): | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = { | |
| "timestamp": timestamp, | |
| "type": "access", | |
| "user_ip": user_ip or "unknown", | |
| "user_agent": user_agent or "unknown" | |
| } | |
| with open(ACCESS_LOG, "a") as f: | |
| f.write(json.dumps(log_entry) + "\n") | |
| def log_submission(scene: str, prompt: str, model: str, user: str = "anonymous", res: str = "unknown"): | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = { | |
| "timestamp": timestamp, | |
| "type": "submission", | |
| "user": user, | |
| "scene": scene, | |
| "prompt": prompt, | |
| "model": model, | |
| "res": res | |
| } | |
| with open(SUBMISSION_LOG, "a") as f: | |
| f.write(json.dumps(log_entry) + "\n") | |
| def read_logs(log_type: str = "all", max_entries: int = 50) -> list: | |
| logs = [] | |
| if log_type in ["all", "access"]: | |
| try: | |
| with open(ACCESS_LOG, "r") as f: | |
| for line in f: | |
| logs.append(json.loads(line.strip())) | |
| except FileNotFoundError: | |
| pass | |
| if log_type in ["all", "submission"]: | |
| try: | |
| with open(SUBMISSION_LOG, "r") as f: | |
| for line in f: | |
| logs.append(json.loads(line.strip())) | |
| except FileNotFoundError: | |
| pass | |
| logs.sort(key=lambda x: x["timestamp"], reverse=True) | |
| return logs[:max_entries] | |
| def format_logs_for_display(logs: list) -> str: | |
| if not logs: | |
| return "No log record" | |
| markdown = "### System Access Log\n\n" | |
| markdown += "| Time | Type | User/IP | Details |\n" | |
| markdown += "|------|------|---------|----------|\n" | |
| for log in logs: | |
| timestamp = log.get("timestamp", "unknown") | |
| log_type = "Access" if log.get("type") == "access" else "Submission" | |
| if log_type == "Access": | |
| user = log.get("user_ip", "unknown") | |
| details = f"User-Agent: {log.get('user_agent', 'unknown')}" | |
| else: | |
| user = log.get("user", "anonymous") | |
| result = log.get('res', 'unknown') | |
| if result != "success": | |
| if len(result) > 40: | |
| result = f"{result[:20]}...{result[-20:]}" | |
| details = f"Scene: {log.get('scene', 'unknown')}, Prompt: {log.get('prompt', '')}, Model: {log.get('model', 'unknown')}, result: {result}" | |
| markdown += f"| {timestamp} | {log_type} | {user} | {details} |\n" | |
| return markdown | |