Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
GitHub Actions
commited on
Commit
·
807d96f
1
Parent(s):
e3a898b
Sync from GitHub repo
Browse files
app.py
CHANGED
|
@@ -1103,65 +1103,150 @@ def setup_periodic_tasks():
|
|
| 1103 |
app.logger.error(f"Error uploading database to {database_repo_id}: {str(e)}")
|
| 1104 |
|
| 1105 |
def sync_preferences_data():
|
| 1106 |
-
"""Zips and uploads preference data folders to HF dataset"""
|
| 1107 |
with app.app_context(): # Ensure app context for logging
|
| 1108 |
if not os.path.isdir(votes_dir):
|
| 1109 |
-
# app.logger.info(f"Votes directory '{votes_dir}' not found, skipping preference sync.")
|
| 1110 |
return # Don't log every 5 mins if dir doesn't exist yet
|
| 1111 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1112 |
try:
|
| 1113 |
api = HfApi(token=os.getenv("HF_TOKEN"))
|
| 1114 |
vote_uuids = [d for d in os.listdir(votes_dir) if os.path.isdir(os.path.join(votes_dir, d))]
|
| 1115 |
|
| 1116 |
if not vote_uuids:
|
| 1117 |
-
#
|
| 1118 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1119 |
|
| 1120 |
-
|
|
|
|
|
|
|
|
|
|
| 1121 |
for vote_uuid in vote_uuids:
|
| 1122 |
dir_path = os.path.join(votes_dir, vote_uuid)
|
| 1123 |
-
|
| 1124 |
-
|
| 1125 |
|
| 1126 |
try:
|
| 1127 |
-
|
| 1128 |
-
|
| 1129 |
-
|
| 1130 |
-
|
| 1131 |
-
|
| 1132 |
-
|
| 1133 |
-
|
| 1134 |
-
|
| 1135 |
-
|
| 1136 |
-
|
| 1137 |
-
|
| 1138 |
-
|
| 1139 |
-
app.logger.
|
| 1140 |
-
|
| 1141 |
-
|
| 1142 |
-
|
| 1143 |
-
|
| 1144 |
-
|
| 1145 |
-
|
| 1146 |
-
|
| 1147 |
-
|
| 1148 |
-
|
| 1149 |
-
|
| 1150 |
-
|
| 1151 |
-
|
| 1152 |
-
|
| 1153 |
-
|
| 1154 |
-
|
| 1155 |
-
|
| 1156 |
-
|
| 1157 |
-
|
| 1158 |
-
|
| 1159 |
-
|
| 1160 |
-
|
| 1161 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1162 |
|
| 1163 |
except Exception as e:
|
| 1164 |
-
app.logger.error(f"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1165 |
|
| 1166 |
|
| 1167 |
# Schedule periodic tasks
|
|
|
|
| 1103 |
app.logger.error(f"Error uploading database to {database_repo_id}: {str(e)}")
|
| 1104 |
|
| 1105 |
def sync_preferences_data():
|
| 1106 |
+
"""Zips and uploads preference data folders in batches to HF dataset"""
|
| 1107 |
with app.app_context(): # Ensure app context for logging
|
| 1108 |
if not os.path.isdir(votes_dir):
|
|
|
|
| 1109 |
return # Don't log every 5 mins if dir doesn't exist yet
|
| 1110 |
|
| 1111 |
+
temp_batch_dir = None # Initialize to manage cleanup
|
| 1112 |
+
temp_individual_zip_dir = None # Initialize for individual zips
|
| 1113 |
+
local_batch_zip_path = None # Initialize for batch zip path
|
| 1114 |
+
|
| 1115 |
try:
|
| 1116 |
api = HfApi(token=os.getenv("HF_TOKEN"))
|
| 1117 |
vote_uuids = [d for d in os.listdir(votes_dir) if os.path.isdir(os.path.join(votes_dir, d))]
|
| 1118 |
|
| 1119 |
if not vote_uuids:
|
| 1120 |
+
return # No data to process
|
| 1121 |
+
|
| 1122 |
+
app.logger.info(f"Found {len(vote_uuids)} vote directories to process.")
|
| 1123 |
+
|
| 1124 |
+
# Create temporary directories
|
| 1125 |
+
temp_batch_dir = tempfile.mkdtemp(prefix="hf_batch_")
|
| 1126 |
+
temp_individual_zip_dir = tempfile.mkdtemp(prefix="hf_indiv_zips_")
|
| 1127 |
+
app.logger.debug(f"Created temp directories: {temp_batch_dir}, {temp_individual_zip_dir}")
|
| 1128 |
|
| 1129 |
+
processed_vote_dirs = []
|
| 1130 |
+
individual_zips_in_batch = []
|
| 1131 |
+
|
| 1132 |
+
# 1. Create individual zips and move them to the batch directory
|
| 1133 |
for vote_uuid in vote_uuids:
|
| 1134 |
dir_path = os.path.join(votes_dir, vote_uuid)
|
| 1135 |
+
individual_zip_base_path = os.path.join(temp_individual_zip_dir, vote_uuid)
|
| 1136 |
+
individual_zip_path = f"{individual_zip_base_path}.zip"
|
| 1137 |
|
| 1138 |
try:
|
| 1139 |
+
shutil.make_archive(individual_zip_base_path, 'zip', dir_path)
|
| 1140 |
+
app.logger.debug(f"Created individual zip: {individual_zip_path}")
|
| 1141 |
+
|
| 1142 |
+
# Move the created zip into the batch directory
|
| 1143 |
+
final_individual_zip_path = os.path.join(temp_batch_dir, f"{vote_uuid}.zip")
|
| 1144 |
+
shutil.move(individual_zip_path, final_individual_zip_path)
|
| 1145 |
+
app.logger.debug(f"Moved individual zip to batch dir: {final_individual_zip_path}")
|
| 1146 |
+
|
| 1147 |
+
processed_vote_dirs.append(dir_path) # Mark original dir for later cleanup
|
| 1148 |
+
individual_zips_in_batch.append(final_individual_zip_path)
|
| 1149 |
+
|
| 1150 |
+
except Exception as zip_err:
|
| 1151 |
+
app.logger.error(f"Error creating or moving zip for {vote_uuid}: {str(zip_err)}")
|
| 1152 |
+
# Clean up partial zip if it exists
|
| 1153 |
+
if os.path.exists(individual_zip_path):
|
| 1154 |
+
try:
|
| 1155 |
+
os.remove(individual_zip_path)
|
| 1156 |
+
except OSError:
|
| 1157 |
+
pass
|
| 1158 |
+
# Continue processing other votes
|
| 1159 |
+
|
| 1160 |
+
# Clean up the temporary dir used for creating individual zips
|
| 1161 |
+
shutil.rmtree(temp_individual_zip_dir)
|
| 1162 |
+
temp_individual_zip_dir = None # Mark as cleaned
|
| 1163 |
+
app.logger.debug("Cleaned up temporary individual zip directory.")
|
| 1164 |
+
|
| 1165 |
+
if not individual_zips_in_batch:
|
| 1166 |
+
app.logger.warning("No individual zips were successfully created for batching.")
|
| 1167 |
+
# Clean up batch dir if it's empty or only contains failed attempts
|
| 1168 |
+
if temp_batch_dir and os.path.exists(temp_batch_dir):
|
| 1169 |
+
shutil.rmtree(temp_batch_dir)
|
| 1170 |
+
temp_batch_dir = None
|
| 1171 |
+
return
|
| 1172 |
+
|
| 1173 |
+
# 2. Create the batch zip file
|
| 1174 |
+
batch_timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
| 1175 |
+
batch_uuid_short = str(uuid.uuid4())[:8]
|
| 1176 |
+
batch_zip_filename = f"{batch_timestamp}_batch_{batch_uuid_short}.zip"
|
| 1177 |
+
# Create batch zip in a standard temp location first
|
| 1178 |
+
local_batch_zip_base = os.path.join(tempfile.gettempdir(), batch_zip_filename.replace('.zip', ''))
|
| 1179 |
+
local_batch_zip_path = f"{local_batch_zip_base}.zip"
|
| 1180 |
+
|
| 1181 |
+
app.logger.info(f"Creating batch zip: {local_batch_zip_path} with {len(individual_zips_in_batch)} individual zips.")
|
| 1182 |
+
shutil.make_archive(local_batch_zip_base, 'zip', temp_batch_dir)
|
| 1183 |
+
app.logger.info(f"Batch zip created successfully: {local_batch_zip_path}")
|
| 1184 |
+
|
| 1185 |
+
# 3. Upload the batch zip file
|
| 1186 |
+
hf_repo_path = f"votes/{year}/{month}/{batch_zip_filename}"
|
| 1187 |
+
app.logger.info(f"Uploading batch zip to HF Hub: {preferences_repo_id}/{hf_repo_path}")
|
| 1188 |
+
|
| 1189 |
+
api.upload_file(
|
| 1190 |
+
path_or_fileobj=local_batch_zip_path,
|
| 1191 |
+
path_in_repo=hf_repo_path,
|
| 1192 |
+
repo_id=preferences_repo_id,
|
| 1193 |
+
repo_type="dataset",
|
| 1194 |
+
commit_message=f"Add batch preference data {batch_zip_filename} ({len(individual_zips_in_batch)} votes)"
|
| 1195 |
+
)
|
| 1196 |
+
app.logger.info(f"Successfully uploaded batch {batch_zip_filename} to {preferences_repo_id}")
|
| 1197 |
+
|
| 1198 |
+
# 4. Cleanup after successful upload
|
| 1199 |
+
app.logger.info("Cleaning up local files after successful upload.")
|
| 1200 |
+
# Remove original vote directories that were successfully zipped and uploaded
|
| 1201 |
+
for dir_path in processed_vote_dirs:
|
| 1202 |
+
try:
|
| 1203 |
+
shutil.rmtree(dir_path)
|
| 1204 |
+
app.logger.debug(f"Removed original vote directory: {dir_path}")
|
| 1205 |
+
except OSError as e:
|
| 1206 |
+
app.logger.error(f"Error removing processed vote directory {dir_path}: {str(e)}")
|
| 1207 |
+
|
| 1208 |
+
# Remove the temporary batch directory (containing the individual zips)
|
| 1209 |
+
shutil.rmtree(temp_batch_dir)
|
| 1210 |
+
temp_batch_dir = None
|
| 1211 |
+
app.logger.debug("Removed temporary batch directory.")
|
| 1212 |
+
|
| 1213 |
+
# Remove the local batch zip file
|
| 1214 |
+
os.remove(local_batch_zip_path)
|
| 1215 |
+
local_batch_zip_path = None
|
| 1216 |
+
app.logger.debug("Removed local batch zip file.")
|
| 1217 |
+
|
| 1218 |
+
app.logger.info(f"Finished preference data sync. Uploaded batch {batch_zip_filename}.")
|
| 1219 |
|
| 1220 |
except Exception as e:
|
| 1221 |
+
app.logger.error(f"Error during preference data batch sync: {str(e)}", exc_info=True)
|
| 1222 |
+
# If upload failed, the local batch zip might exist, clean it up.
|
| 1223 |
+
if local_batch_zip_path and os.path.exists(local_batch_zip_path):
|
| 1224 |
+
try:
|
| 1225 |
+
os.remove(local_batch_zip_path)
|
| 1226 |
+
app.logger.debug("Cleaned up local batch zip after failed upload.")
|
| 1227 |
+
except OSError as clean_err:
|
| 1228 |
+
app.logger.error(f"Error cleaning up batch zip after failed upload: {clean_err}")
|
| 1229 |
+
# Do NOT remove temp_batch_dir if it exists; its contents will be retried next time.
|
| 1230 |
+
# Do NOT remove original vote directories if upload failed.
|
| 1231 |
+
|
| 1232 |
+
finally:
|
| 1233 |
+
# Final cleanup for temporary directories in case of unexpected exits
|
| 1234 |
+
if temp_individual_zip_dir and os.path.exists(temp_individual_zip_dir):
|
| 1235 |
+
try:
|
| 1236 |
+
shutil.rmtree(temp_individual_zip_dir)
|
| 1237 |
+
except Exception as final_clean_err:
|
| 1238 |
+
app.logger.error(f"Error in final cleanup (indiv zips): {final_clean_err}")
|
| 1239 |
+
# Only clean up batch dir in finally block if it *wasn't* kept intentionally after upload failure
|
| 1240 |
+
if temp_batch_dir and os.path.exists(temp_batch_dir):
|
| 1241 |
+
# Check if an upload attempt happened and failed
|
| 1242 |
+
upload_failed = 'e' in locals() and isinstance(e, Exception) # Crude check if exception occurred
|
| 1243 |
+
if not upload_failed: # If no upload error or upload succeeded, clean up
|
| 1244 |
+
try:
|
| 1245 |
+
shutil.rmtree(temp_batch_dir)
|
| 1246 |
+
except Exception as final_clean_err:
|
| 1247 |
+
app.logger.error(f"Error in final cleanup (batch dir): {final_clean_err}")
|
| 1248 |
+
else:
|
| 1249 |
+
app.logger.warning("Keeping temporary batch directory due to upload failure for next attempt.")
|
| 1250 |
|
| 1251 |
|
| 1252 |
# Schedule periodic tasks
|