Spaces:
Running
Running
non destructive dataset operations
Browse files- src/dataset_utils.py +12 -1
- src/monitoring.py +23 -11
- templates/spaces/trackio/dataset_utils.py +12 -1
src/dataset_utils.py
CHANGED
|
@@ -190,14 +190,25 @@ class TrackioDatasetManager:
|
|
| 190 |
|
| 191 |
# Load existing experiments for union merge
|
| 192 |
existing = {}
|
|
|
|
| 193 |
try:
|
| 194 |
-
|
|
|
|
| 195 |
exp_id = row.get('experiment_id')
|
| 196 |
if exp_id:
|
| 197 |
existing[exp_id] = row
|
| 198 |
except Exception:
|
| 199 |
existing = {}
|
| 200 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 201 |
# Validate and merge
|
| 202 |
merged_map: Dict[str, Dict[str, Any]] = {}
|
| 203 |
# Seed with existing
|
|
|
|
| 190 |
|
| 191 |
# Load existing experiments for union merge
|
| 192 |
existing = {}
|
| 193 |
+
dataset_exists = self.check_dataset_exists()
|
| 194 |
try:
|
| 195 |
+
existing_list = self.load_existing_experiments()
|
| 196 |
+
for row in existing_list:
|
| 197 |
exp_id = row.get('experiment_id')
|
| 198 |
if exp_id:
|
| 199 |
existing[exp_id] = row
|
| 200 |
except Exception:
|
| 201 |
existing = {}
|
| 202 |
|
| 203 |
+
# Safety guard: avoid destructive overwrite if dataset exists but
|
| 204 |
+
# we failed to read any existing records (e.g., transient HF issue)
|
| 205 |
+
if dataset_exists and len(existing) == 0 and len(experiments) <= 3:
|
| 206 |
+
logger.error(
|
| 207 |
+
"β Refusing to overwrite dataset: existing records could not be loaded "
|
| 208 |
+
"but repository exists. Skipping save to prevent data loss."
|
| 209 |
+
)
|
| 210 |
+
return False
|
| 211 |
+
|
| 212 |
# Validate and merge
|
| 213 |
merged_map: Dict[str, Dict[str, Any]] = {}
|
| 214 |
# Seed with existing
|
src/monitoring.py
CHANGED
|
@@ -424,9 +424,18 @@ class SmolLM3Monitor:
|
|
| 424 |
self.metrics_history.append(metrics)
|
| 425 |
|
| 426 |
# Save to HF Dataset periodically (configurable)
|
| 427 |
-
flush_every = getattr(self, 'flush_interval', 10)
|
| 428 |
-
|
| 429 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 430 |
|
| 431 |
logger.debug("Metrics logged: %s", metrics)
|
| 432 |
|
|
@@ -690,20 +699,23 @@ class SmolLM3Monitor:
|
|
| 690 |
# Final save to HF Dataset with proper status update
|
| 691 |
if self.dataset_manager:
|
| 692 |
try:
|
| 693 |
-
# Update experiment with final status
|
| 694 |
final_experiment_data = {
|
| 695 |
'status': final_status,
|
| 696 |
'experiment_end_time': datetime.now().isoformat(),
|
| 697 |
'final_metrics_count': len(self.metrics_history),
|
| 698 |
'total_artifacts': len(self.artifacts)
|
| 699 |
}
|
| 700 |
-
|
| 701 |
-
|
| 702 |
-
|
| 703 |
-
|
| 704 |
-
|
| 705 |
-
|
| 706 |
-
|
|
|
|
|
|
|
|
|
|
| 707 |
except Exception as e:
|
| 708 |
logger.error(f"β Failed to save final experiment data: {e}")
|
| 709 |
|
|
|
|
| 424 |
self.metrics_history.append(metrics)
|
| 425 |
|
| 426 |
# Save to HF Dataset periodically (configurable)
|
| 427 |
+
flush_every = max(1, int(getattr(self, 'flush_interval', 10)))
|
| 428 |
+
# Only append the delta since last flush to minimize risk
|
| 429 |
+
try:
|
| 430 |
+
if not hasattr(self, '_last_flushed_index'):
|
| 431 |
+
self._last_flushed_index = 0
|
| 432 |
+
if len(self.metrics_history) - self._last_flushed_index >= flush_every:
|
| 433 |
+
new_slice = self.metrics_history[self._last_flushed_index:]
|
| 434 |
+
# Persist only the tail slice; merge code will union-append
|
| 435 |
+
self._save_to_hf_dataset({'metrics': new_slice})
|
| 436 |
+
self._last_flushed_index = len(self.metrics_history)
|
| 437 |
+
except Exception:
|
| 438 |
+
pass
|
| 439 |
|
| 440 |
logger.debug("Metrics logged: %s", metrics)
|
| 441 |
|
|
|
|
| 699 |
# Final save to HF Dataset with proper status update
|
| 700 |
if self.dataset_manager:
|
| 701 |
try:
|
| 702 |
+
# Update experiment with final status without clobbering metrics
|
| 703 |
final_experiment_data = {
|
| 704 |
'status': final_status,
|
| 705 |
'experiment_end_time': datetime.now().isoformat(),
|
| 706 |
'final_metrics_count': len(self.metrics_history),
|
| 707 |
'total_artifacts': len(self.artifacts)
|
| 708 |
}
|
| 709 |
+
self._save_to_hf_dataset(final_experiment_data)
|
| 710 |
+
# Also persist any unflushed metrics tail
|
| 711 |
+
try:
|
| 712 |
+
last_idx = getattr(self, '_last_flushed_index', 0)
|
| 713 |
+
if len(self.metrics_history) > last_idx:
|
| 714 |
+
tail = self.metrics_history[last_idx:]
|
| 715 |
+
self._save_to_hf_dataset({'metrics': tail})
|
| 716 |
+
self._last_flushed_index = len(self.metrics_history)
|
| 717 |
+
except Exception:
|
| 718 |
+
pass
|
| 719 |
except Exception as e:
|
| 720 |
logger.error(f"β Failed to save final experiment data: {e}")
|
| 721 |
|
templates/spaces/trackio/dataset_utils.py
CHANGED
|
@@ -173,13 +173,24 @@ class TrackioDatasetManager:
|
|
| 173 |
|
| 174 |
# Load existing experiments for union merge
|
| 175 |
existing = {}
|
|
|
|
| 176 |
try:
|
| 177 |
-
|
|
|
|
| 178 |
exp_id = row.get('experiment_id')
|
| 179 |
if exp_id:
|
| 180 |
existing[exp_id] = row
|
| 181 |
except Exception:
|
| 182 |
existing = {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 183 |
|
| 184 |
# Validate and merge
|
| 185 |
merged_map: Dict[str, Dict[str, Any]] = {}
|
|
|
|
| 173 |
|
| 174 |
# Load existing experiments for union merge
|
| 175 |
existing = {}
|
| 176 |
+
dataset_exists = self.check_dataset_exists()
|
| 177 |
try:
|
| 178 |
+
existing_list = self.load_existing_experiments()
|
| 179 |
+
for row in existing_list:
|
| 180 |
exp_id = row.get('experiment_id')
|
| 181 |
if exp_id:
|
| 182 |
existing[exp_id] = row
|
| 183 |
except Exception:
|
| 184 |
existing = {}
|
| 185 |
+
|
| 186 |
+
# Safety guard: avoid destructive overwrite if dataset exists but
|
| 187 |
+
# we failed to read any existing records (e.g., transient HF issue)
|
| 188 |
+
if dataset_exists and len(existing) == 0 and len(experiments) <= 3:
|
| 189 |
+
logger.error(
|
| 190 |
+
"β Refusing to overwrite dataset: existing records could not be loaded "
|
| 191 |
+
"but repository exists. Skipping save to prevent data loss."
|
| 192 |
+
)
|
| 193 |
+
return False
|
| 194 |
|
| 195 |
# Validate and merge
|
| 196 |
merged_map: Dict[str, Dict[str, Any]] = {}
|