Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from transformers import AutoTokenizer | |
import yaml | |
from deepseek_v3 import DeepSeekV3Model | |
import os | |
def generate_helper(model, idx, max_new_tokens, context_length, temperature=1.0, top_k=None, eos_token=None, device=None): | |
model = model.to(device) | |
idx = idx.to(device) | |
model.eval() | |
for _ in range(max_new_tokens): | |
idx_cond = idx[:, -context_length:] | |
with torch.no_grad(): | |
logits, _ = model(idx_cond) # Unpack both logits and loss (ignore loss) | |
logits = logits.view(idx_cond.shape[0], -1, model.config['vocab_size']) # Reshape to [batch, seq, vocab] | |
# Get the logits for the last token only | |
logits = logits[:, -1, :] # Shape: [batch_size, vocab_size] | |
if top_k is not None: | |
# top k sampling | |
top_logits, top_pos = torch.topk(logits, top_k) | |
min_logit = top_logits[:, -1].unsqueeze(-1) | |
logits = torch.where(logits < min_logit, | |
torch.tensor(float('-inf')).to(logits.device), | |
logits) | |
# temperature scaling | |
if temperature > 0.0: | |
logits /= temperature | |
probs = torch.softmax(logits, dim=-1) | |
idx_next = torch.multinomial(probs, num_samples=1) | |
else: | |
idx_next = torch.argmax(logits, dim=-1, keepdim=True) | |
if idx_next.item() == eos_token: | |
break | |
idx = torch.cat((idx, idx_next), dim=1) | |
model.train() | |
return idx | |
def get_config(config_path): | |
config = yaml.load(open(config_path, "r"), Loader=yaml.FullLoader) | |
return config | |
def extract_and_save_weights(config_path, checkpoint_path, weights_path, device): | |
"""Extract model weights from checkpoint and save as a separate .pt file""" | |
print(f"Extracting weights from checkpoint: {checkpoint_path}") | |
config = get_config(config_path) | |
model = DeepSeekV3Model(config['model']) | |
# Load checkpoint | |
checkpoint = torch.load(checkpoint_path, map_location=torch.device(device)) | |
state_dict = checkpoint['model_state_dict'] | |
state_dict = {k.replace('_orig_mod.', ''): v for k, v in state_dict.items()} | |
# Save just the model weights | |
torch.save(state_dict, weights_path) | |
print(f"Model weights saved to: {weights_path}") | |
return state_dict | |
def load_weights(config, weights_path, device): | |
"""Load model from weights file""" | |
print(f"Loading model from weights: {weights_path}") | |
model = DeepSeekV3Model(config['model']) | |
state_dict = torch.load(weights_path, map_location=torch.device(device)) | |
model.load_state_dict(state_dict) | |
return model | |
def get_tokenizer(config): | |
tokenizer_path = config['tokenizer']['tokenizer_name_or_path'] | |
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) | |
tokenizer.pad_token = tokenizer.eos_token | |
vocab_size = tokenizer.vocab_size | |
return tokenizer, vocab_size | |
def generate_text(model, tokenizer, input_text, max_new_tokens, context_length, temperature, top_k, eos_token, device): | |
encoded_text = tokenizer.encode(input_text, return_tensors="pt").to(device) | |
generated_text = generate_helper(model, | |
idx=encoded_text, | |
max_new_tokens=max_new_tokens, | |
context_length=context_length, | |
temperature=temperature, | |
top_k=top_k, | |
eos_token=eos_token, | |
device=device) | |
return tokenizer.decode(generated_text.squeeze(0)) | |
# Initialize model and tokenizer | |
def initialize_model(): | |
config_path = "config_smollm2_135M.yaml" | |
# Use HF Hub or another external storage instead of local path | |
model_id = "crpatel/DeepSeek-V3-SmolLm2" # Replace with your actual model ID | |
weights_path = "model.pt" | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Load configuration | |
config = get_config(config_path) | |
# Check if weights exist locally, otherwise download from HF Hub | |
if not os.path.exists(weights_path): | |
try: | |
from huggingface_hub import hf_hub_download | |
print(f"Downloading model weights from Hugging Face Hub: {model_id}") | |
weights_path = hf_hub_download( | |
repo_id=model_id, | |
filename="model.pt" | |
) | |
except Exception as e: | |
print(f"Error downloading weights: {e}") | |
print("Falling back to local checkpoint extraction if available") | |
checkpoint_path = "checkpoints/model_100000_step_avg_loss_4.61663.pth" | |
if os.path.exists(checkpoint_path): | |
extract_and_save_weights(config_path, checkpoint_path, weights_path, device) | |
else: | |
raise FileNotFoundError(f"Neither weights file nor checkpoint found. Please upload model to HF Hub first.") | |
# Load model from weights | |
model = load_weights(config, weights_path, device) | |
model.to(device) | |
model.eval() | |
# Load tokenizer | |
tokenizer, vocab_size = get_tokenizer(config) | |
return model, tokenizer, device | |
def generate_response(prompt, max_new_tokens): | |
generated_text = generate_text( | |
model=model, | |
tokenizer=tokenizer, | |
input_text=prompt, | |
max_new_tokens=max_new_tokens, | |
context_length=256, | |
temperature=0.9, | |
top_k=2, | |
eos_token=tokenizer.eos_token_id, | |
device=device | |
) | |
return generated_text | |
# Initialize global variables | |
model, tokenizer, device = initialize_model() | |
# Create Gradio interface | |
iface = gr.Interface( | |
fn=generate_response, | |
inputs=[ | |
gr.Textbox( | |
lines=3, | |
placeholder="Enter your prompt here...", | |
label="Input Prompt" | |
), | |
gr.Slider( | |
minimum=50, | |
maximum=256, | |
value=100, | |
step=10, | |
label="Max New Tokens" | |
) | |
], | |
outputs=gr.Textbox( | |
lines=5, | |
label="Generated Text" | |
), | |
title="DeepSeek-V3 Text Generator", | |
description="Enter a prompt and adjust the maximum number of tokens to generate text with DeepSeek-V3 SmolLM2 model." | |
) | |
if __name__ == "__main__": | |
iface.launch() |