Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| from fastapi import APIRouter | |
| from datetime import datetime | |
| from datasets import load_dataset | |
| from sklearn.metrics import accuracy_score | |
| import numpy as np | |
| import os | |
| import torch | |
| from torch.utils.data import DataLoader | |
| from .utils.evaluation import AudioEvaluationRequest | |
| from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
| from .utils.data import FFTDataset | |
| from .utils.models import DualEncoder, CNNKan | |
| from .utils.train import Trainer | |
| from .utils.data_utils import collate_fn, Container | |
| import yaml | |
| import asyncio | |
| from huggingface_hub import login | |
| from collections import OrderedDict | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| router = APIRouter() | |
| DESCRIPTION = "Conformer" | |
| ROUTE = "/audio" | |
| async def evaluate_audio(request: AudioEvaluationRequest): | |
| """ | |
| Evaluate audio classification for rainforest sound detection. | |
| Current Model: Random Baseline | |
| - Makes random predictions from the label space (0-1) | |
| - Used as a baseline for comparison | |
| """ | |
| # Get space info | |
| username, space_url = get_space_info() | |
| # Define the label mapping | |
| LABEL_MAPPING = { | |
| "chainsaw": 0, | |
| "environment": 1 | |
| } | |
| # Load and prepare the dataset | |
| # Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate | |
| dataset = load_dataset(request.dataset_name,token=os.getenv("HF_TOKEN")) | |
| # Split dataset | |
| train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed) | |
| test_dataset = train_test["test"] | |
| # Start tracking emissions | |
| tracker.start() | |
| tracker.start_task("inference") | |
| #-------------------------------------------------------------------------------------------- | |
| # YOUR MODEL INFERENCE CODE HERE | |
| # Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. | |
| #-------------------------------------------------------------------------------------------- | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| args_path = 'tasks/utils/config.yaml' | |
| data_args = Container(**yaml.safe_load(open(args_path, 'r'))['Data']) | |
| model_args = Container(**yaml.safe_load(open(args_path, 'r'))['CNNEncoder']) | |
| model_args_f = Container(**yaml.safe_load(open(args_path, 'r'))['CNNEncoder_f']) | |
| conformer_args = Container(**yaml.safe_load(open(args_path, 'r'))['Conformer']) | |
| kan_args = Container(**yaml.safe_load(open(args_path, 'r'))['KAN']) | |
| test_dataset = FFTDataset(test_dataset) | |
| test_dl = DataLoader(test_dataset, batch_size=data_args.batch_size, collate_fn=collate_fn) | |
| model = CNNKan(model_args, conformer_args, kan_args.get_dict()) | |
| model = model.to(device) | |
| state_dict = torch.load(data_args.checkpoint_path, map_location=torch.device('cpu')) | |
| new_state_dict = OrderedDict() | |
| for key, value in state_dict.items(): | |
| if key.startswith('module.'): | |
| key = key[7:] | |
| new_state_dict[key] = value | |
| missing, unexpected = model.load_state_dict(new_state_dict) | |
| loss_fn = torch.nn.BCEWithLogitsLoss() | |
| optimizer = torch.optim.Adam(model.parameters(), lr=5e-4) | |
| trainer = Trainer(model=model, optimizer=optimizer, | |
| criterion=loss_fn, output_dim=model_args.output_dim, scaler=None, | |
| scheduler=None, train_dataloader=None, | |
| val_dataloader=None, device=device, | |
| exp_num='test', log_path=None, | |
| range_update=None, | |
| accumulation_step=1, max_iter=np.inf, | |
| exp_name=f"frugal_cnnencoder_inference") | |
| predictions, true_labels, acc = trainer.predict(test_dl, device=device) | |
| # true_labels = test_dataset["label"] | |
| # Make random predictions (placeholder for actual model inference) | |
| print("accuracy: ", acc) | |
| print("predictions: ", len(predictions)) | |
| print("true_labels: ", len(true_labels)) | |
| #-------------------------------------------------------------------------------------------- | |
| # YOUR MODEL INFERENCE STOPS HERE | |
| #-------------------------------------------------------------------------------------------- | |
| # Stop tracking emissions | |
| emissions_data = tracker.stop_task() | |
| # Calculate accuracy | |
| accuracy = accuracy_score(true_labels, predictions) | |
| # Prepare results dictionary | |
| results = { | |
| "username": username, | |
| "space_url": space_url, | |
| "submission_timestamp": datetime.now().isoformat(), | |
| "model_description": DESCRIPTION, | |
| "accuracy": float(accuracy), | |
| "energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
| "emissions_gco2eq": emissions_data.emissions * 1000, | |
| "emissions_data": clean_emissions_data(emissions_data), | |
| "api_route": ROUTE, | |
| "dataset_config": { | |
| "dataset_name": request.dataset_name, | |
| "test_size": request.test_size, | |
| "test_seed": request.test_seed | |
| } | |
| } | |
| return results | |
| if __name__ == "__main__": | |
| sample_request = AudioEvaluationRequest( | |
| dataset_name="rfcx/frugalai", # Replace with actual dataset name | |
| test_size=0.2, # Example values | |
| test_seed=42 | |
| ) | |
| # | |
| asyncio.run(evaluate_audio(sample_request)) | 
