Spaces:
Running
Running
import json | |
import time | |
import asyncio | |
import uvicorn | |
from fastapi import FastAPI, Request, HTTPException, Header, Depends | |
from fastapi.responses import StreamingResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel, Field | |
from typing import List, Optional, Dict, Any, Union | |
import requests | |
from datetime import datetime | |
import logging | |
import os | |
from dotenv import load_dotenv | |
# 加载环境变量 | |
load_dotenv() | |
# 配置日志 | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger("openai-proxy") | |
# 创建FastAPI应用 | |
app = FastAPI( | |
title="OpenAI API Proxy", | |
description="将OpenAI API请求代理到DeepSider API", | |
version="1.0.0" | |
) | |
# 添加CORS中间件 | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# 配置 | |
DEEPSIDER_API_BASE = "https://api.chargpt.ai/api/v2" | |
TOKEN_INDEX = 0 | |
# 模型映射表 | |
MODEL_MAPPING = { | |
"gpt-4o-mini": "openai/gpt-4o-mini", | |
"gpt-4o": "openai/gpt-4o", | |
"o1": "openai/o1", | |
"o3-mini": "openai/o3-mini", # Fixed the typo in "oopenai/o3-mini" | |
"claude-3.5-sonnet": "anthropic/claude-3.5-sonnet", | |
"claude-3.7-sonnet": "anthropic/claude-3.7-sonnet", | |
"grok-3": "x-ai/grok-3", | |
"grok-3-reasoner": "x-ai/grok-3-reasoner", | |
"deepseek-v3": "deepseek/deepseek-chat", | |
"deepseek-r1": "deepseek/deepseek-r1", | |
"gemini-2.0-flash": "google/gemini-2.0-flash", | |
"gemini-2.0-pro-exp": "google/gemini-2.0-pro-exp-02-05", | |
"gemini-2.0-flash-thinking-exp": "google/gemini-2.0-flash-thinking-exp-1219", | |
"qwq-32b": "qwen/qwq-32b", | |
"qwen-max": "qwen/qwen-max" | |
} | |
# 请求头 | |
def get_headers(api_key): | |
global TOKEN_INDEX | |
# 检查是否包含多个token(用逗号分隔) | |
tokens = api_key.split(',') | |
if len(tokens) > 0: | |
# 轮询选择token | |
current_token = tokens[TOKEN_INDEX % len(tokens)] | |
TOKEN_INDEX = (TOKEN_INDEX + 1) % len(tokens) | |
else: | |
current_token = api_key | |
return { | |
"accept": "*/*", | |
"accept-encoding": "gzip, deflate, br, zstd", | |
"accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", | |
"content-type": "application/json", | |
"origin": "chrome-extension://client", | |
"i-lang": "zh-CN", | |
"i-version": "1.1.64", | |
"sec-ch-ua": '"Chromium";v="134", "Not:A-Brand";v="24"', | |
"sec-ch-ua-mobile": "?0", | |
"sec-ch-ua-platform": "Windows", | |
"sec-fetch-dest": "empty", | |
"sec-fetch-mode": "cors", | |
"sec-fetch-site": "cross-site", | |
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", | |
"authorization": f"Bearer {current_token.strip()}" | |
} | |
# OpenAI API请求模型 | |
class ChatMessage(BaseModel): | |
role: str | |
content: str | |
name: Optional[str] = None | |
class ChatCompletionRequest(BaseModel): | |
model: str | |
messages: List[ChatMessage] | |
temperature: Optional[float] = 1.0 | |
top_p: Optional[float] = 1.0 | |
n: Optional[int] = 1 | |
stream: Optional[bool] = False | |
stop: Optional[Union[List[str], str]] = None | |
max_tokens: Optional[int] = None | |
presence_penalty: Optional[float] = 0 | |
frequency_penalty: Optional[float] = 0 | |
user: Optional[str] = None | |
# 账户余额查询函数 | |
async def check_account_balance(api_key, token_index=None): | |
"""检查账户余额信息""" | |
tokens = api_key.split(',') | |
# 如果提供了token_index并且有效,则使用指定的token | |
if token_index is not None and len(tokens) > token_index: | |
current_token = tokens[token_index].strip() | |
else: | |
# 否则使用第一个token | |
current_token = tokens[0].strip() if tokens else api_key | |
headers = { | |
"accept": "*/*", | |
"content-type": "application/json", | |
"authorization": f"Bearer {current_token}" | |
} | |
try: | |
# 获取账户余额信息 | |
response = requests.get( | |
f"{DEEPSIDER_API_BASE.replace('/v2', '')}/quota/retrieve", | |
headers=headers | |
) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get('code') == 0: | |
quota_list = data.get('data', {}).get('list', []) | |
# 解析余额信息 | |
quota_info = {} | |
for item in quota_list: | |
item_type = item.get('type', '') | |
available = item.get('available', 0) | |
quota_info[item_type] = { | |
"total": item.get('total', 0), | |
"available": available, | |
"title": item.get('title', '') | |
} | |
return True, quota_info | |
return False, {} | |
except Exception as e: | |
logger.warning(f"检查账户余额出错:{str(e)}") | |
return False, {} | |
# 工具函数 | |
def verify_api_key(api_key: str = Header(..., alias="Authorization")): | |
"""验证API密钥""" | |
if not api_key.startswith("Bearer "): | |
raise HTTPException(status_code=401, detail="Invalid API key format") | |
return api_key.replace("Bearer ", "") | |
def map_openai_to_deepsider_model(model: str) -> str: | |
"""将OpenAI模型名称映射到DeepSider模型名称""" | |
return MODEL_MAPPING.get(model, "anthropic/claude-3.7-sonnet") | |
def format_messages_for_deepsider(messages: List[ChatMessage]) -> str: | |
"""格式化消息列表为DeepSider API所需的提示格式""" | |
prompt = "" | |
for msg in messages: | |
role = msg.role | |
# 将OpenAI的角色映射到DeepSider能理解的格式 | |
if role == "system": | |
# 系统消息放在开头 作为指导 | |
prompt = f"{msg.content}\n\n" + prompt | |
elif role == "user": | |
prompt += f"Human: {msg.content}\n\n" | |
elif role == "assistant": | |
prompt += f"Assistant: {msg.content}\n\n" | |
else: | |
# 其他角色按用户处理 | |
prompt += f"Human ({role}): {msg.content}\n\n" | |
# 如果最后一个消息不是用户的 添加一个Human前缀引导模型回答 | |
if messages and messages[-1].role != "user": | |
prompt += "Human: " | |
return prompt.strip() | |
async def generate_openai_response(full_response: str, request_id: str, model: str) -> Dict: | |
"""生成符合OpenAI API响应格式的完整响应""" | |
timestamp = int(time.time()) | |
return { | |
"id": f"chatcmpl-{request_id}", | |
"object": "chat.completion", | |
"created": timestamp, | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"message": { | |
"role": "assistant", | |
"content": full_response | |
}, | |
"finish_reason": "stop" | |
} | |
], | |
"usage": { | |
"prompt_tokens": 0, # 无法准确计算 | |
"completion_tokens": 0, # 无法准确计算 | |
"total_tokens": 0 # 无法准确计算 | |
} | |
} | |
async def stream_openai_response(response, request_id: str, model: str, api_key, token_index): | |
"""流式返回OpenAI API格式的响应""" | |
timestamp = int(time.time()) | |
full_response = "" | |
try: | |
# 将DeepSider响应流转换为OpenAI流格式 | |
for line in response.iter_lines(): | |
if not line: | |
continue | |
if line.startswith(b'data: '): | |
try: | |
data = json.loads(line[6:].decode('utf-8')) | |
if data.get('code') == 202 and data.get('data', {}).get('type') == "chat": | |
# 获取正文内容 | |
content = data.get('data', {}).get('content', '') | |
if content: | |
full_response += content | |
# 生成OpenAI格式的流式响应 | |
chunk = { | |
"id": f"chatcmpl-{request_id}", | |
"object": "chat.completion.chunk", | |
"created": timestamp, | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": { | |
"content": content | |
}, | |
"finish_reason": None | |
} | |
] | |
} | |
yield f"data: {json.dumps(chunk)}\n\n" | |
elif data.get('code') == 203: | |
# 生成完成信号 | |
chunk = { | |
"id": f"chatcmpl-{request_id}", | |
"object": "chat.completion.chunk", | |
"created": timestamp, | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {}, | |
"finish_reason": "stop" | |
} | |
] | |
} | |
yield f"data: {json.dumps(chunk)}\n\n" | |
yield "data: [DONE]\n\n" | |
except json.JSONDecodeError: | |
logger.warning(f"无法解析响应: {line}") | |
except Exception as e: | |
logger.error(f"流式响应处理出错: {str(e)}") | |
# 尝试使用下一个Token | |
tokens = api_key.split(',') | |
if len(tokens) > 1: | |
logger.info(f"尝试使用下一个Token重试请求") | |
# 目前我们不在这里实现自动重试,只记录错误 | |
# 返回错误信息 | |
error_chunk = { | |
"id": f"chatcmpl-{request_id}", | |
"object": "chat.completion.chunk", | |
"created": timestamp, | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": { | |
"content": f"\n\n[处理响应时出错: {str(e)}]" | |
}, | |
"finish_reason": "stop" | |
} | |
] | |
} | |
yield f"data: {json.dumps(error_chunk)}\n\n" | |
yield "data: [DONE]\n\n" | |
# 路由定义 | |
async def root(): | |
return {"message": "OpenAI API Proxy服务已启动 连接至DeepSider API"} | |
async def list_models(api_key: str = Depends(verify_api_key)): | |
"""列出可用的模型""" | |
models = [] | |
for openai_model, _ in MODEL_MAPPING.items(): | |
models.append({ | |
"id": openai_model, | |
"object": "model", | |
"created": int(time.time()), | |
"owned_by": "openai-proxy" | |
}) | |
return { | |
"object": "list", | |
"data": models | |
} | |
async def create_chat_completion( | |
request: Request, | |
api_key: str = Depends(verify_api_key) | |
): | |
"""创建聊天完成API - 支持普通请求和流式请求""" | |
# 解析请求体 | |
body = await request.json() | |
chat_request = ChatCompletionRequest(**body) | |
# 生成唯一请求ID | |
request_id = datetime.now().strftime("%Y%m%d%H%M%S") + str(time.time_ns())[-6:] | |
# 映射模型 | |
deepsider_model = map_openai_to_deepsider_model(chat_request.model) | |
# 准备DeepSider API所需的提示 | |
prompt = format_messages_for_deepsider(chat_request.messages) | |
# 准备请求体 | |
payload = { | |
"model": deepsider_model, | |
"prompt": prompt, | |
"webAccess": "close", # 默认关闭网络访问 | |
"timezone": "Asia/Shanghai" | |
} | |
# 获取请求头(包含选择的token) | |
headers = get_headers(api_key) | |
# 获取当前使用的token | |
tokens = api_key.split(',') | |
current_token_index = (TOKEN_INDEX - 1) % len(tokens) if len(tokens) > 0 else 0 | |
try: | |
# 发送请求到DeepSider API | |
response = requests.post( | |
f"{DEEPSIDER_API_BASE}/chat/conversation", | |
headers=headers, | |
json=payload, | |
stream=True | |
) | |
# 检查响应状态 | |
if response.status_code != 200: | |
error_msg = f"DeepSider API请求失败: {response.status_code}" | |
try: | |
error_data = response.json() | |
error_msg += f" - {error_data.get('message', '')}" | |
except: | |
error_msg += f" - {response.text}" | |
logger.error(error_msg) | |
raise HTTPException(status_code=response.status_code, detail="API请求失败") | |
# 处理流式或非流式响应 | |
if chat_request.stream: | |
# 返回流式响应 | |
return StreamingResponse( | |
stream_openai_response(response, request_id, chat_request.model, api_key, current_token_index), | |
media_type="text/event-stream" | |
) | |
else: | |
# 收集完整响应 | |
full_response = "" | |
for line in response.iter_lines(): | |
if not line: | |
continue | |
if line.startswith(b'data: '): | |
try: | |
data = json.loads(line[6:].decode('utf-8')) | |
if data.get('code') == 202 and data.get('data', {}).get('type') == "chat": | |
content = data.get('data', {}).get('content', '') | |
if content: | |
full_response += content | |
except json.JSONDecodeError: | |
pass | |
# 返回OpenAI格式的完整响应 | |
return await generate_openai_response(full_response, request_id, chat_request.model) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.exception("处理请求时出错") | |
raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}") | |
async def get_account_balance(request: Request, admin_key: str = Header(None, alias="X-Admin-Key")): | |
"""查看账户余额""" | |
# 简单的管理密钥检查 | |
expected_admin_key = os.getenv("ADMIN_KEY", "admin") | |
if not admin_key or admin_key != expected_admin_key: | |
raise HTTPException(status_code=403, detail="Unauthorized") | |
# 从请求头中获取API密钥 | |
auth_header = request.headers.get("Authorization", "") | |
if not auth_header or not auth_header.startswith("Bearer "): | |
raise HTTPException(status_code=401, detail="Missing or invalid Authorization header") | |
api_key = auth_header.replace("Bearer ", "") | |
tokens = api_key.split(',') | |
result = {} | |
# 获取所有token的余额信息 | |
for i, token in enumerate(tokens): | |
token_display = f"token_{i+1}" | |
success, quota_info = await check_account_balance(api_key, i) | |
if success: | |
result[token_display] = { | |
"status": "success", | |
"quota": quota_info | |
} | |
else: | |
result[token_display] = { | |
"status": "error", | |
"message": "无法获取账户余额信息" | |
} | |
return result | |
# 错误处理器 | |
async def not_found_handler(request, exc): | |
return { | |
"error": { | |
"message": f"未找到资源: {request.url.path}", | |
"type": "not_found_error", | |
"code": "not_found" | |
} | |
}, 404 | |
# 启动事件 | |
async def startup_event(): | |
"""服务启动时初始化""" | |
logger.info(f"OpenAI API代理服务已启动,可以接受请求") | |
logger.info(f"支持多token轮询,请在Authorization头中使用英文逗号分隔多个token") | |
# 主程序 | |
if __name__ == "__main__": | |
# 启动服务器 | |
port = int(os.getenv("PORT", "7860")) | |
logger.info(f"启动OpenAI API代理服务 端口: {port}") | |
uvicorn.run(app, host="0.0.0.0", port=port) |