Spaces:
Runtime error
Runtime error
| import os | |
| import base64 | |
| import json | |
| import io | |
| import datetime | |
| from PIL import Image | |
| import logging | |
| from huggingface_hub import HfApi, CommitScheduler | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| HF_DATASET_NAME = "aiwithoutborders-xyz/degentic_rd0" | |
| LOCAL_LOG_DIR = "./hf_inference_logs" | |
| # Custom JSON Encoder to handle numpy types | |
| class NumpyEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, np.float32): | |
| return float(obj) | |
| return json.JSONEncoder.default(self, obj) | |
| def _save_pil_image_to_file(image: Image.Image, directory: str, prefix: str) -> str: | |
| """Saves a PIL Image to a file and returns its filename.""" | |
| if not isinstance(image, Image.Image): | |
| raise TypeError(f"Expected a PIL Image, but received type: {type(image)}") | |
| os.makedirs(directory, exist_ok=True) | |
| timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f") | |
| filename = f"{prefix}_{timestamp_str}.png" | |
| file_path = os.path.join(directory, filename) | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| image.save(file_path, format="PNG") | |
| logger.info(f"Saved image to: {file_path}") | |
| return filename | |
| # The initialize_dataset function will change significantly or be removed/simplified | |
| # as we are no longer appending to a datasets.Dataset object directly in memory | |
| def initialize_dataset_repo(): | |
| """Initializes or ensures the Hugging Face dataset repository exists.""" | |
| api = HfApi(token=os.getenv("HF_TOKEN")) | |
| try: | |
| api.repo_info(repo_id=HF_DATASET_NAME, repo_type="dataset") | |
| logger.info(f"Hugging Face dataset repository already exists: {HF_DATASET_NAME}") | |
| except Exception: | |
| logger.info(f"Creating new Hugging Face dataset repository: {HF_DATASET_NAME}") | |
| api.create_repo(repo_id=HF_DATASET_NAME, repo_type="dataset", private=True) | |
| return api # Return the API object for subsequent operations | |
| def log_inference_data( | |
| original_image: Image.Image, | |
| inference_params: dict, | |
| model_predictions: list[dict], | |
| ensemble_output: dict, | |
| forensic_images: list[Image.Image], | |
| agent_monitoring_data: dict, | |
| human_feedback: dict = None | |
| ): | |
| """Logs a single inference event by uploading a JSON file to the Hugging Face dataset repository.""" | |
| try: | |
| api = initialize_dataset_repo() # Get or create the repository | |
| original_image_filename = _save_pil_image_to_file(original_image, LOCAL_LOG_DIR, "original") | |
| forensic_images_filenames = [] | |
| for img_item in forensic_images: | |
| if img_item is not None: | |
| if not isinstance(img_item, Image.Image): | |
| try: | |
| img_item = Image.fromarray(img_item) | |
| except Exception as e: | |
| logger.error(f"Error converting forensic image to PIL for saving: {e}") | |
| continue | |
| forensic_images_filenames.append(_save_pil_image_to_file(img_item, LOCAL_LOG_DIR, "forensic")) | |
| new_entry = { | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "image": original_image_filename, | |
| "inference_request": inference_params, | |
| "model_predictions": model_predictions, | |
| "ensemble_output": ensemble_output, | |
| "forensic_outputs": forensic_images_filenames, | |
| "agent_monitoring_data": agent_monitoring_data, | |
| "human_feedback": human_feedback if human_feedback is not None else {} | |
| } | |
| # Define a unique path for the new log file within the local directory | |
| os.makedirs(LOCAL_LOG_DIR, exist_ok=True) # Ensure the local directory exists | |
| timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f") | |
| log_file_path = os.path.join(LOCAL_LOG_DIR, f"log_{timestamp_str}.json") | |
| # Serialize the new entry to a JSON file using the custom encoder | |
| with open(log_file_path, 'w', encoding='utf-8') as f: | |
| json.dump(new_entry, f, cls=NumpyEncoder, indent=2) | |
| # Schedule commit to Hugging Face dataset repository | |
| scheduler = CommitScheduler( | |
| repo_id=HF_DATASET_NAME, | |
| repo_type="dataset", | |
| folder_path=LOCAL_LOG_DIR, | |
| path_in_repo="logs", | |
| token=os.getenv("HF_TOKEN"), | |
| every=10 # Commit every 10 files | |
| ) | |
| with scheduler: | |
| logger.info(f"Inference data logged successfully to local file: {log_file_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to log inference data to local file: {e}") |