from fastapi import APIRouter, Query, HTTPException import requests from pydantic import BaseModel # from utils.utils import generate_signature import hmac import hashlib import json import time routerVideo = APIRouter() # 第三方API地址 EXTERNAL_API_URL = "https://api.suyanw.cn/api/Iqiyi_video_search.php" @routerVideo.get("/video-search/") async def video_search( msg: str = Query(..., description="需要搜索的视频内容"), n: int = Query(0, description="选择序号"), size: int = Query(10, description="搜索内容列表数"), hh: str = Query("\n", description="换行符号,默认为\\n") ): """ 视频搜索接口 - 封装第三方 API。 """ # 参数传递给第三方API params = { "msg": msg, "n": n, "size": size, "hh": hh } try: # 发送请求到第三方API response = requests.get(EXTERNAL_API_URL, params=params) response.raise_for_status() # 确保HTTP请求成功 # 返回第三方API的结果 return response.text.split(hh) except requests.exceptions.RequestException as e: # 处理请求错误 raise HTTPException(status_code=500, detail=f"请求第三方API出错: {str(e)}") @routerVideo.get("/video-recommend/") async def video_search( source: str = Query(..., description="资源类型"), search_type: str = Query(..., description="返回格式,默认text可选json") ): api_url = "" params = {} if source == '1': api_url = "https://api.suyanw.cn/api/IQIYI_search_recommendation.php" """ 视频搜索接口 - 封装第三方 API。 """ # 参数传递给第三方API params = { "type": search_type, "hh": "\\n" } try: # 发送请求到第三方API response = requests.get(api_url, params=params) response.raise_for_status() # 确保HTTP请求成功 # 返回第三方API的结果 return response.json() except requests.exceptions.RequestException as e: # 处理请求错误 raise HTTPException(status_code=500, detail=f"请求第三方API出错: {str(e)}") # 定义请求体模型 class SearchVideoRequest(BaseModel): date: int # 时间 videoName: str # 视频名称 sign: str @routerVideo.post("/video/search") def search_videos(request: SearchVideoRequest): """ 搜索视频接口 :param date: 时间 :param videoName: 视频名称 :return: 符合条件的视频列表 """ api_url = "https://api.q168.vip/api/search" params = { "videoName": request.videoName } # 签名 # data_str = json.dumps(params) # sign = hmac.new("OArmXWpMeX8n".encode('utf-8'), data_str.encode('utf-8'), hashlib.sha256).hexdigest() # sign = generate_signature(params) # 设置请求头 headers = { "Sign": request.sign, "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br", # 加入 br "origin": "https://wantwatch.me", "Content-Type": "application/json; charset=utf-8", } try: # 发送请求到第三方API response = requests.post(api_url, data=params, headers=headers) response.raise_for_status() # 确保HTTP请求成功 # 返回第三方API的结果 return response.json() except requests.exceptions.RequestException as e: # 处理请求错误 raise HTTPException(status_code=500, detail=f"请求第三方API出错: {str(e)}")