Create merge_and_save.py
Browse files- merge_and_save.py +162 -0
merge_and_save.py
ADDED
@@ -0,0 +1,162 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
|
2 |
+
from datasets import load_dataset
|
3 |
+
import pandas as pd
|
4 |
+
from datetime import datetime
|
5 |
+
from huggingface_hub import HfApi, HfFolder
|
6 |
+
import time
|
7 |
+
import logging
|
8 |
+
from tqdm.auto import tqdm
|
9 |
+
import os
|
10 |
+
|
11 |
+
# Set up logging
|
12 |
+
|
13 |
+
HfFolder.save_token(os.getenv("HF_TOKEN"))
|
14 |
+
logging.basicConfig(level=logging.INFO)
|
15 |
+
logger = logging.getLogger(__name__)
|
16 |
+
|
17 |
+
|
18 |
+
|
19 |
+
def load_huggingface_data(dataset_name, file1_name, file2_name):
|
20 |
+
"""Load datasets from Hugging Face"""
|
21 |
+
logger.info("Loading datasets from Hugging Face...")
|
22 |
+
|
23 |
+
# Load the first CSV file
|
24 |
+
dataset1 = load_dataset(dataset_name,
|
25 |
+
data_files={'train': file1_name},
|
26 |
+
split='train')
|
27 |
+
|
28 |
+
# Load the second CSV file
|
29 |
+
dataset2 = load_dataset(dataset_name,
|
30 |
+
data_files={'train': file2_name},
|
31 |
+
split='train')
|
32 |
+
|
33 |
+
# Convert to pandas DataFrames
|
34 |
+
df1 = pd.DataFrame(dataset1)
|
35 |
+
df2 = pd.DataFrame(dataset2)
|
36 |
+
|
37 |
+
logger.info(f"Loaded {len(df1)} rows from {file1_name}")
|
38 |
+
logger.info(f"Loaded {len(df2)} rows from {file2_name}")
|
39 |
+
|
40 |
+
return df1, df2
|
41 |
+
|
42 |
+
|
43 |
+
def merge_newest(df1, df2):
|
44 |
+
"""Process and merge the datasets"""
|
45 |
+
logger.info("Processing datasets...")
|
46 |
+
|
47 |
+
# Perform full outer join on idg
|
48 |
+
merged_df = pd.merge(df1, df2,
|
49 |
+
on='id',
|
50 |
+
how='outer',
|
51 |
+
suffixes=('', '_y'))
|
52 |
+
|
53 |
+
# For each column that got a suffix, combine it with the original column
|
54 |
+
for col in merged_df.columns:
|
55 |
+
if col.endswith('_y'):
|
56 |
+
original_col = col[:-2] # Remove the '_y' suffix
|
57 |
+
# Combine columns, taking the non-null value
|
58 |
+
merged_df[original_col] = merged_df[original_col].combine_first(merged_df[col])
|
59 |
+
# Drop the suffix column
|
60 |
+
merged_df = merged_df.drop(columns=[col])
|
61 |
+
|
62 |
+
# Final column order
|
63 |
+
desired_columns = ['title', 'score', 'id', 'url', 'num_comments',
|
64 |
+
'created', 'body', 'content', 'subreddit']
|
65 |
+
|
66 |
+
# Reorder columns, only keeping those that exist
|
67 |
+
final_columns = [col for col in desired_columns if col in merged_df.columns]
|
68 |
+
merged_df = merged_df[final_columns]
|
69 |
+
|
70 |
+
return merged_df
|
71 |
+
|
72 |
+
|
73 |
+
|
74 |
+
def save_to_huggingface(df, repo_id):
|
75 |
+
"""Save the merged dataset to Hugging Face"""
|
76 |
+
logger.info("Saving to Hugging Face...")
|
77 |
+
|
78 |
+
# Generate filename with today's date
|
79 |
+
# today_date = datetime.now().strftime('%Y%m%d')
|
80 |
+
filename = f"merged_reddit_data.csv"
|
81 |
+
|
82 |
+
# Save locally first
|
83 |
+
df.to_csv(filename, index=False)
|
84 |
+
|
85 |
+
# Upload to Hugging Face
|
86 |
+
api = HfApi()
|
87 |
+
api.upload_file(
|
88 |
+
path_or_fileobj=filename,
|
89 |
+
path_in_repo= f"submission/{filename}",
|
90 |
+
repo_id=repo_id,
|
91 |
+
repo_type="dataset"
|
92 |
+
)
|
93 |
+
|
94 |
+
return filename
|
95 |
+
|
96 |
+
def get_newes_file(repo_id):
|
97 |
+
"""
|
98 |
+
Get the newest file from the HuggingFace repository
|
99 |
+
|
100 |
+
Args:
|
101 |
+
repo_id (str): The repository ID on HuggingFace
|
102 |
+
|
103 |
+
Returns:
|
104 |
+
str: The filename of the newest merged file
|
105 |
+
"""
|
106 |
+
api = HfApi()
|
107 |
+
|
108 |
+
# List all files in the repository
|
109 |
+
files = api.list_repo_files(repo_id, repo_type="dataset")
|
110 |
+
|
111 |
+
# Filter for merged files
|
112 |
+
merged_files = [f for f in files if f.startswith('merged_reddit_data_')]
|
113 |
+
|
114 |
+
if not merged_files:
|
115 |
+
raise ValueError("No merged files found in repository")
|
116 |
+
|
117 |
+
# Extract dates from filenames and pair with filenames
|
118 |
+
file_dates = []
|
119 |
+
for filename in merged_files:
|
120 |
+
try:
|
121 |
+
# Extract date string (assuming format: merged_reddit_data_YYYYMMDD.csv)
|
122 |
+
date_str = filename.split('_')[-1].split('.')[0]
|
123 |
+
date = datetime.strptime(date_str, '%Y%m%d')
|
124 |
+
file_dates.append((date, filename))
|
125 |
+
except (IndexError, ValueError):
|
126 |
+
continue
|
127 |
+
|
128 |
+
if not file_dates:
|
129 |
+
raise ValueError("No valid dated files found")
|
130 |
+
|
131 |
+
# Sort by date and get the newest file
|
132 |
+
newest_file = sorted(file_dates, key=lambda x: x[0], reverse=True)[0][1]
|
133 |
+
|
134 |
+
return newest_file
|
135 |
+
|
136 |
+
|
137 |
+
|
138 |
+
def merged_and_save():
|
139 |
+
# Initialize Reddit API
|
140 |
+
|
141 |
+
repo_id = "Vera-ZWY/reddite2024elections_submissions"
|
142 |
+
|
143 |
+
file_new = get_newes_file(repo_id)
|
144 |
+
file_old = "submission/merged_reddit_data.csv"
|
145 |
+
|
146 |
+
df1, df2 = load_huggingface_data(repo_id, file_new, file_old)
|
147 |
+
print(f"Newest dataset shape: {df1.shape}")
|
148 |
+
print(f"Old dataset columns: {df1.columns.tolist()}")
|
149 |
+
|
150 |
+
# Process and merge data
|
151 |
+
merged_df = process_data(df1, df2)
|
152 |
+
|
153 |
+
|
154 |
+
|
155 |
+
output_file = save_to_huggingface(merged_df, repo_id)
|
156 |
+
|
157 |
+
logger.info(f"Processing complete. File saved as {output_file}")
|
158 |
+
return f"Processing complete. File saved as {output_file}. Old dataset columns: {merged_df.columns.tolist()}"
|
159 |
+
|
160 |
+
|
161 |
+
|
162 |
+
|