orpheus-3b-0.1-ft_gguf
gguf version of canopylabs/orpheus-3b-0.1-ft that is English TTS model.
This model is designed to closely mimic the behavior of the original model.
We do not recommend replacing this model with a quantized model created by someone else.
Use it with the sample script below.
sample script
sample llama.cpp server command.
llama-server -m orpheus-3b-Q4_K_L.gguf --prio 3 -c 2048 -n -2 --port 8080 --host 127.0.0.1 --no-webui
sample realtime play sample script.
import asyncio
import json
import sys
import httpx
import re
import torch
import scipy.io.wavfile as wavfile
from snac import SNAC
import locale
import argparse
from typing import List, Optional, Dict, Any, Generator
import numpy as np
import pyaudio
import time
import os
from io import BytesIO
# Set UTF-8 as the default encoding
locale.getpreferredencoding = lambda: "UTF-8"
class LlamaAudioConverter:
def __init__(self, server_url="http://127.0.0.1:8080", output_file="output.wav",
verbose=False, realtime_playback=True):
"""
A class to handle audio conversion from Llama token stream
Args:
server_url: URL of the Llama server
output_file: Path to save the output WAV file
verbose: Enable detailed logging
realtime_playback: Enable real-time audio playback
"""
self.server_url = server_url
self.output_file = output_file
self.verbose = verbose
self.collected_tokens = []
self.realtime_playback = realtime_playback
self.audio_chunks = []
self.sample_rate = 24000
# Performance measurement variables
self.start_time = None
self.ttfb = None
self.audio_start_time = None
self.token_count = 0
# Initialize PyAudio (only for real-time playback)
if self.realtime_playback:
self.p = pyaudio.PyAudio()
self.stream = self.p.open(
format=pyaudio.paFloat32,
channels=1,
rate=self.sample_rate,
output=True
)
else:
self.p = None
self.stream = None
# Load SNAC model
print("Loading SNAC model...")
self.snac_model = SNAC.from_pretrained("hubertsiuzdak/snac_24khz")
self.snac_model.to("cpu") # Move to CPU for processing
async def process_stream(self, response_stream):
"""Process the streaming response and collect tokens"""
buffer = ""
first_byte_received = False
# Start stream processing
async for chunk in response_stream:
# Record time when first byte is received
if not first_byte_received:
self.ttfb = time.time() - self.start_time
print(f"TTFB: {self.ttfb * 1000:.2f}ms")
first_byte_received = True
if chunk:
try:
if isinstance(chunk, bytes):
chunk_text = chunk.decode('utf-8', errors='replace')
else:
chunk_text = chunk
# Check data format (whether it's SSE format)
for line in chunk_text.strip().split('\n'):
if not line.strip():
continue
# Check for SSE format
if line.startswith("data: "):
line = line[6:] # Remove "data: " prefix
data = json.loads(line)
if "content" in data and data["content"]:
content = data["content"]
buffer += content
# Extract tokens
pattern = r'<custom_token_(\d+)>'
matches = re.findall(pattern, content)
for match in matches:
token_id = 128256 + int(match)
self.collected_tokens.append(token_id)
self.token_count += 1
if self.verbose:
print(f"Token detected: <custom_token_{int(match)}> -> ID: {token_id}")
# Process in chunks of 7 tokens only in real-time mode
if self.realtime_playback and len(self.collected_tokens) >= 7 and len(self.collected_tokens) % 7 == 0:
await self.process_collected_tokens()
# Record time of first audio chunk generation
if self.audio_start_time is None and self.audio_chunks:
self.audio_start_time = time.time()
print(f"Time to first audio chunk: {(self.audio_start_time - self.start_time) * 1000:.2f}ms")
except json.JSONDecodeError as e:
if self.verbose:
print(f"Warning: Invalid JSON: {chunk_text}, Error: {e}")
except Exception as e:
if self.verbose:
print(f"Error during chunk processing: {str(e)}")
# Process all tokens after receiving the complete stream
if not self.realtime_playback:
# In no-playback mode, process all tokens at once
print(f"Received all tokens. Generating audio in batch...")
await self.process_all_tokens()
else:
# Process remaining tokens (for real-time playback mode)
if self.collected_tokens:
await self.process_collected_tokens(final=True)
# Display performance metrics
total_time = time.time() - self.start_time
print(f"\nProcessing completed:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"TTFB: {self.ttfb * 1000:.2f}ms")
if self.audio_start_time:
print(f"Time to first audio chunk: {(self.audio_start_time - self.start_time) * 1000:.2f}ms")
print(f"Tokens processed: {self.token_count}")
print(f"Tokens per second: {self.token_count / total_time:.2f}")
# Close the audio stream (only for real-time mode)
if self.realtime_playback and self.stream:
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
# Save response to file (for debugging)
try:
with open("response.txt", 'w', encoding='utf-8') as f:
f.write(buffer)
print(f"\nResponse saved to response.txt.")
except Exception as e:
print(f"Failed to write to file: {e}")
# Combine and save all audio chunks
if self.audio_chunks and self.output_file:
try:
# Combine audio chunks
combined_audio = np.concatenate(self.audio_chunks)
# Save as WAV file
print(f"Saving audio to WAV file: {self.output_file}")
wavfile.write(self.output_file, self.sample_rate, combined_audio)
except Exception as e:
print(f"Error while saving audio file: {e}")
async def process_all_tokens(self):
"""Process all tokens at once to generate audio (for non-real-time mode)"""
if not self.collected_tokens:
return
# Adjust to a multiple of 7
tokens_count = (len(self.collected_tokens) // 7) * 7
if tokens_count == 0:
print("Warning: No tokens in multiples of 7. No audio will be generated.")
return
tokens_to_process = self.collected_tokens[:tokens_count]
try:
print(f"Batch processing {len(tokens_to_process)} tokens...")
# Redistribute codes
codes = self.redistribute_codes(tokens_to_process)
# Generate audio
with torch.inference_mode():
audio_hat = self.snac_model.decode(codes)
# Convert to NumPy array
audio_np = audio_hat.detach().squeeze().to("cpu").numpy()
# Save audio chunk
self.audio_chunks.append(audio_np)
# Record time of audio generation
if self.audio_start_time is None:
self.audio_start_time = time.time()
print(f"Time to audio generation completion: {(self.audio_start_time - self.start_time) * 1000:.2f}ms")
except Exception as e:
print(f"Error during audio processing: {e}")
async def process_collected_tokens(self, final=False):
"""Process collected tokens to convert to audio (for real-time mode)"""
if not self.collected_tokens or not self.realtime_playback:
return
# Determine how many tokens to process
if final:
# Final processing - adjust to a multiple of 7
remaining = len(self.collected_tokens) % 7
if remaining > 0:
# Truncate to a multiple of 7
tokens_to_process = self.collected_tokens[:-remaining] if remaining > 0 else self.collected_tokens
else:
tokens_to_process = self.collected_tokens
self.collected_tokens = []
else:
# Process tokens in multiples of 7 (appropriate size for audio chunks)
tokens_count = (len(self.collected_tokens) // 7) * 7
tokens_to_process = self.collected_tokens[:tokens_count]
self.collected_tokens = self.collected_tokens[tokens_count:]
if not tokens_to_process:
return
try:
# Redistribute codes
codes = self.redistribute_codes(tokens_to_process)
# Generate audio
with torch.inference_mode():
audio_hat = self.snac_model.decode(codes)
# Convert to NumPy array
audio_np = audio_hat.detach().squeeze().to("cpu").numpy()
# Save audio chunk
self.audio_chunks.append(audio_np)
# Real-time playback
if self.stream:
try:
# Play audio data as Float32
self.stream.write(audio_np.astype(np.float32).tobytes())
if self.verbose:
print(f"Playing audio chunk ({len(audio_np)} samples)...")
except Exception as e:
print(f"Error during audio playback: {e}")
if self.verbose:
print(f"Processed {len(tokens_to_process)} tokens. Remaining: {len(self.collected_tokens)}")
except Exception as e:
print(f"Error during audio processing: {e}")
def redistribute_codes(self, tokens: List[int]) -> List[torch.Tensor]:
"""Redistribute token IDs to SNAC codes"""
# Adjust to a multiple of 7
code_length = (len(tokens) // 7) * 7
if len(tokens) > code_length:
tokens = tokens[:code_length]
if self.verbose:
print(f"Warning: Truncated token list to multiple of 7 ({code_length})")
# Subtract 128266 from token IDs (128256 + custom value)
code_list = [t - 128266 for t in tokens]
layer_1 = []
layer_2 = []
layer_3 = []
for i in range(len(code_list) // 7):
layer_1.append(code_list[7*i])
layer_2.append(code_list[7*i+1]-4096)
layer_3.append(code_list[7*i+2]-(2*4096))
layer_3.append(code_list[7*i+3]-(3*4096))
layer_2.append(code_list[7*i+4]-(4*4096))
layer_3.append(code_list[7*i+5]-(5*4096))
layer_3.append(code_list[7*i+6]-(6*4096))
codes = [
torch.tensor(layer_1).unsqueeze(0),
torch.tensor(layer_2).unsqueeze(0),
torch.tensor(layer_3).unsqueeze(0)
]
return codes
async def send_prompt_to_llama(self, prompt: str) -> None:
"""Send prompt to the specified Llama server and process streaming response"""
# Record start time
self.start_time = time.time()
self.token_count = 0
self.audio_start_time = None
# Request payload
payload = {
"prompt": prompt,
"temperature": 0.8,
"top_p": 0.95,
"top_k": 40,
"min_p": 0.05,
"n_predict": 2048,
"stream": True # Enable streaming mode
}
print(f"Sending prompt to server {self.server_url}...")
if self.verbose:
print(f"Prompt: {prompt}")
# Display operation mode information
if self.realtime_playback:
print("Real-time playback mode: Enabled (processing and playing tokens as they arrive)")
else:
print("Real-time playback mode: Disabled (batch processing all tokens after completion)")
# Use async HTTP client
async with httpx.AsyncClient(timeout=None) as client:
try:
# Streaming request
async with client.stream(
"POST",
f"{self.server_url}/completion",
json=payload,
headers={"Accept": "application/x-ndjson"}
) as response:
response.raise_for_status()
# Process stream
await self.process_stream(response.aiter_bytes())
except httpx.HTTPError as e:
print(f"HTTP Error: {e}")
except Exception as e:
print(f"Error: {e}")
async def main():
parser = argparse.ArgumentParser(description='Send prompts to Llama server and generate/play audio')
parser.add_argument('--text', '-t', default="Hello, My name is tara.",
help='Text to synthesize into speech')
parser.add_argument('--output', '-o', default='output.wav',
help='Output WAV file path (default: output.wav)')
parser.add_argument('--server', '-s', default='http://127.0.0.1:8080',
help='Llama server address (default: http://127.0.0.1:8080)')
parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
parser.add_argument('--no-playback', '-np', action='store_true',
help='Disable real-time playback and process all tokens after completion')
args = parser.parse_args()
# Check dependencies (only if real-time playback is enabled)
if not args.no_playback:
try:
import pyaudio
except ImportError:
print("Warning: PyAudio is not installed. Disabling real-time playback.")
print("To install: pip install pyaudio")
args.no_playback = True
# Initialize converter
converter = LlamaAudioConverter(
server_url=args.server,
output_file=args.output,
verbose=args.verbose,
realtime_playback=not args.no_playback
)
# Construct prompt
prompt = f"<custom_token_3><|begin_of_text|>tara: {args.text}<|eot_id|><custom_token_4><custom_token_5><custom_token_1>"
# Send prompt to server
await converter.send_prompt_to_llama(prompt)
if __name__ == "__main__":
asyncio.run(main())
For real-time playback
You need at least some 90 token/second enable hardware.
Also, the this sample does not improve the problem of noise being mixed in during real-time playback.
Many people have been trying it out with Orpheus-TTS github, so it's a good idea to check it out.
python script.py --text "Hello, how are you today?" --server http://your-server-address:8080
For batch processing (no playback)
python script.py --text "Hello, how are you today?" --server http://your-server-address:8080 --no-playback
See also
- Downloads last month
- 120
Hardware compatibility
Log In
to view the estimation
3-bit
4-bit
5-bit
6-bit
8-bit
16-bit
Inference Providers
NEW
This model isn't deployed by any Inference Provider.
๐
Ask for provider support