Spaces:
Runtime error
Runtime error
import os, json, requests, runpod | |
import torch | |
import torchaudio | |
from einops import rearrange | |
from stable_audio_tools import get_pretrained_model | |
from stable_audio_tools.inference.generation import generate_diffusion_cond | |
discord_token = os.getenv('com_camenduru_discord_token') | |
web_uri = os.getenv('com_camenduru_web_uri') | |
web_token = os.getenv('com_camenduru_web_token') | |
with torch.inference_mode(): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model, model_config = get_pretrained_model("audo/stable-audio-open-1.0") | |
sample_rate = model_config["sample_rate"] | |
sample_size = model_config["sample_size"] | |
model = model.to(device) | |
def generate(input): | |
values = input["input"] | |
prompt = values['prompt'] | |
seconds_start = values['seconds_start'] | |
seconds_total = values['seconds_total'] | |
steps = values['steps'] | |
cfg_scale = values['cfg_scale'] | |
sigma_min = values['sigma_min'] | |
sigma_max = values['sigma_max'] | |
sampler_type = values['sampler_type'] | |
conditioning = [{ | |
"prompt": prompt, | |
"seconds_start": seconds_start, | |
"seconds_total": seconds_total | |
}] | |
output = generate_diffusion_cond( | |
model, | |
steps=steps, | |
cfg_scale=cfg_scale, | |
conditioning=conditioning, | |
sample_size=sample_size, | |
sigma_min=sigma_min, | |
sigma_max=sigma_max, | |
sampler_type=sampler_type, | |
device=device | |
) | |
output = rearrange(output, "b d n -> d (b n)") | |
output = output.to(torch.float32).div(torch.max(torch.abs(output))).clamp(-1, 1).mul(32767).to(torch.int16).cpu() | |
max_length = sample_rate * seconds_total | |
if output.shape[1] > max_length: | |
output = output[:, :max_length] | |
torchaudio.save("/content/output.wav", output, sample_rate) | |
result = "/content/output.wav" | |
try: | |
notify_uri = values['notify_uri'] | |
del values['notify_uri'] | |
notify_token = values['notify_token'] | |
del values['notify_token'] | |
discord_id = values['discord_id'] | |
del values['discord_id'] | |
if(discord_id == "discord_id"): | |
discord_id = os.getenv('com_camenduru_discord_id') | |
discord_channel = values['discord_channel'] | |
del values['discord_channel'] | |
if(discord_channel == "discord_channel"): | |
discord_channel = os.getenv('com_camenduru_discord_channel') | |
discord_token = values['discord_token'] | |
del values['discord_token'] | |
if(discord_token == "discord_token"): | |
discord_token = os.getenv('com_camenduru_discord_token') | |
job_id = values['job_id'] | |
del values['job_id'] | |
# default_filename = os.path.basename(result) | |
# with open(result, "rb") as file: | |
# files = {default_filename: file.read()} | |
# payload = {"content": f"{json.dumps(values)} <@{discord_id}>"} | |
# response = requests.post( | |
# f"https://discord.com/api/v9/channels/{discord_channel}/messages", | |
# data=payload, | |
# headers={"Authorization": f"Bot {discord_token}"}, | |
# files=files | |
# ) | |
# response.raise_for_status() | |
# result_url = response.json()['attachments'][0]['url'] | |
with open(result, 'rb') as file: | |
response = requests.post("https://upload.tost.ai/api/v1", files={'file': file}) | |
response.raise_for_status() | |
result_url = response.text | |
notify_payload = {"jobId": job_id, "result": result_url, "status": "DONE"} | |
web_notify_uri = os.getenv('com_camenduru_web_notify_uri') | |
web_notify_token = os.getenv('com_camenduru_web_notify_token') | |
if(notify_uri == "notify_uri"): | |
requests.post(web_notify_uri, data=json.dumps(notify_payload), headers={'Content-Type': 'application/json', "Authorization": web_notify_token}) | |
else: | |
requests.post(web_notify_uri, data=json.dumps(notify_payload), headers={'Content-Type': 'application/json', "Authorization": web_notify_token}) | |
requests.post(notify_uri, data=json.dumps(notify_payload), headers={'Content-Type': 'application/json', "Authorization": notify_token}) | |
return {"jobId": job_id, "result": result_url, "status": "DONE"} | |
except Exception as e: | |
error_payload = {"jobId": job_id, "status": "FAILED"} | |
try: | |
if(notify_uri == "notify_uri"): | |
requests.post(web_notify_uri, data=json.dumps(error_payload), headers={'Content-Type': 'application/json', "Authorization": web_notify_token}) | |
else: | |
requests.post(web_notify_uri, data=json.dumps(error_payload), headers={'Content-Type': 'application/json', "Authorization": web_notify_token}) | |
requests.post(notify_uri, data=json.dumps(error_payload), headers={'Content-Type': 'application/json', "Authorization": notify_token}) | |
except: | |
pass | |
return {"jobId": job_id, "result": f"FAILED: {str(e)}", "status": "FAILED"} | |
finally: | |
if os.path.exists(result): | |
os.remove(result) | |
runpod.serverless.start({"handler": generate}) |