narendrasinghd's picture
Upload app.py
8a47c44 verified
import os, json, random, requests
from livekit import api
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Request
from dotenv import load_dotenv
load_dotenv()
LIVEKIT_API_KEY_LOCAL = os.getenv('LIVEKIT_API_KEY_LOCAL')
LIVEKIT_API_SECRET_LOCAL = os.getenv('LIVEKIT_API_SECRET_LOCAL')
LIVEKIT_WEBSCOKET_URL_LOCAL = os.getenv('LIVEKIT_WEBSCOKET_URL_LOCAL')
LIVEKIT_API_KEY_STAGE = os.getenv('LIVEKIT_API_KEY_STAGE')
LIVEKIT_API_SECRET_STAGE = os.getenv('LIVEKIT_API_SECRET_STAGE')
LIVEKIT_WEBSCOKET_URL_STAGE = os.getenv('LIVEKIT_WEBSCOKET_URL_STAGE')
LIVEKIT_API_KEY_PROD = os.getenv('LIVEKIT_API_KEY_PROD')
LIVEKIT_API_SECRET_PROD = os.getenv('LIVEKIT_API_SECRET_PROD')
LIVEKIT_WEBSCOKET_URL_PROD = os.getenv('LIVEKIT_WEBSCOKET_URL_PROD')
LIVEKIT_API_KEY_SUWARNA = os.getenv('LIVEKIT_API_KEY_SUWARNA')
LIVEKIT_API_SECRET_SUWARNA = os.getenv('LIVEKIT_API_SECRET_SUWARNA')
LIVEKIT_WEBSCOKET_URL_SUWARNA = os.getenv('LIVEKIT_URL_SUWARNA')
LIVEKIT_API_SECRET_NUNNA = os.getenv('LIVEKIT_API_SECRET_NUNNA')
LIVEKIT_API_KEY_NUNNA = os.getenv('LIVEKIT_API_KEY_NUNNA')
LIVEKIT_WEBSCOKET_URL_NUNNA = os.getenv('LIVEKIT_WEBSCOKET_URL_NUNNA')
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get('/generate_token_local')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = 'local-'+''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=5))
if not LIVEKIT_API_KEY_LOCAL or not LIVEKIT_API_SECRET_LOCAL:
raise ValueError("LIVEKIT_API_KEY_LOCAL and LIVEKIT_API_SECRET_LOCAL must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_local"
name = "kickcall_local_name"
at = api.AccessToken(LIVEKIT_API_KEY_LOCAL, LIVEKIT_API_SECRET_LOCAL).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_LOCAL,
})
@app.post('/sms_interactions/reply_message')
async def sms_interactions(request: Request):
body = await request.body()
body_data = json.loads(body.decode("utf-8"))
print(f"body_data: {body_data}")
return { "body": "DOne"}
@app.get('/generate_token_stage')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = 'stage-'+''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7))
if not LIVEKIT_API_KEY_STAGE or not LIVEKIT_API_SECRET_STAGE:
raise ValueError("LIVEKIT_API_KEY_STAGE and LIVEKIT_API_SECRET_STAGE must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_stage"
name = "kickcall_stage_name"
at = api.AccessToken(LIVEKIT_API_KEY_STAGE, LIVEKIT_API_SECRET_STAGE).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_STAGE,
})
@app.get('/generate_token_prod')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7))
if not LIVEKIT_API_KEY_PROD or not LIVEKIT_API_SECRET_PROD:
raise ValueError("LIVEKIT_API_KEY_PROD and LIVEKIT_API_SECRET_PROD must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_prod"
name = "kickcall_prod_name"
at = api.AccessToken(LIVEKIT_API_KEY_PROD, LIVEKIT_API_SECRET_PROD).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_PROD,
})
@app.get('/generate_token_suwarna')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7))
if not LIVEKIT_API_KEY_SUWARNA or not LIVEKIT_API_SECRET_SUWARNA:
raise ValueError("LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_suwarna"
name = "suwarna"
at = api.AccessToken(LIVEKIT_API_KEY_SUWARNA, LIVEKIT_API_SECRET_SUWARNA).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_SUWARNA,
})
@app.get('/generate_token_nunna')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7))
if not LIVEKIT_API_KEY_NUNNA or not LIVEKIT_API_SECRET_NUNNA:
raise ValueError("LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_lakshmi"
name = "laksmi"
at = api.AccessToken(LIVEKIT_API_KEY_NUNNA, LIVEKIT_API_SECRET_NUNNA).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_NUNNA,
})