ymx / video /iqiyi.py
xulh
代码初始化
9471b22
from fastapi import APIRouter, Query, HTTPException
import requests
from pydantic import BaseModel
# from utils.utils import generate_signature
import hmac
import hashlib
import json
import time
routerVideo = APIRouter()
# 第三方API地址
EXTERNAL_API_URL = "https://api.suyanw.cn/api/Iqiyi_video_search.php"
@routerVideo.get("/video-search/")
async def video_search(
msg: str = Query(..., description="需要搜索的视频内容"),
n: int = Query(0, description="选择序号"),
size: int = Query(10, description="搜索内容列表数"),
hh: str = Query("\n", description="换行符号,默认为\\n")
):
"""
视频搜索接口 - 封装第三方 API。
"""
# 参数传递给第三方API
params = {
"msg": msg,
"n": n,
"size": size,
"hh": hh
}
try:
# 发送请求到第三方API
response = requests.get(EXTERNAL_API_URL, params=params)
response.raise_for_status() # 确保HTTP请求成功
# 返回第三方API的结果
return response.text.split(hh)
except requests.exceptions.RequestException as e:
# 处理请求错误
raise HTTPException(status_code=500, detail=f"请求第三方API出错: {str(e)}")
@routerVideo.get("/video-recommend/")
async def video_search(
source: str = Query(..., description="资源类型"),
search_type: str = Query(..., description="返回格式,默认text可选json")
):
api_url = ""
params = {}
if source == '1':
api_url = "https://api.suyanw.cn/api/IQIYI_search_recommendation.php"
"""
视频搜索接口 - 封装第三方 API。
"""
# 参数传递给第三方API
params = {
"type": search_type,
"hh": "\\n"
}
try:
# 发送请求到第三方API
response = requests.get(api_url, params=params)
response.raise_for_status() # 确保HTTP请求成功
# 返回第三方API的结果
return response.json()
except requests.exceptions.RequestException as e:
# 处理请求错误
raise HTTPException(status_code=500, detail=f"请求第三方API出错: {str(e)}")
# 定义请求体模型
class SearchVideoRequest(BaseModel):
date: int # 时间
videoName: str # 视频名称
sign: str
@routerVideo.post("/video/search")
def search_videos(request: SearchVideoRequest):
"""
搜索视频接口
:param date: 时间
:param videoName: 视频名称
:return: 符合条件的视频列表
"""
api_url = "https://api.q168.vip/api/search"
params = {
"videoName": request.videoName
}
# 签名
# data_str = json.dumps(params)
# sign = hmac.new("OArmXWpMeX8n".encode('utf-8'), data_str.encode('utf-8'), hashlib.sha256).hexdigest()
# sign = generate_signature(params)
# 设置请求头
headers = {
"Sign": request.sign,
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br", # 加入 br
"origin": "https://wantwatch.me",
"Content-Type": "application/json; charset=utf-8",
}
try:
# 发送请求到第三方API
response = requests.post(api_url, data=params, headers=headers)
response.raise_for_status() # 确保HTTP请求成功
# 返回第三方API的结果
return response.json()
except requests.exceptions.RequestException as e:
# 处理请求错误
raise HTTPException(status_code=500, detail=f"请求第三方API出错: {str(e)}")