# import os | |
# # import pandas as pd | |
# import fireducks.pandas as pd | |
# from PIL import Image | |
# from io import BytesIO | |
# from tqdm import tqdm | |
# from datasets import load_dataset | |
# import subprocess | |
# # Load image name list from CSV | |
# csv_path = "/home/IITB/ai-at-ieor/23m1521/ashish/MTP/Vaani/Vaani-Audio-Image-Hindi2.csv" | |
# valid_image_names = set(pd.read_csv(csv_path)["referenceImage"].dropna().unique()) | |
# # Create directory to save images | |
# save_dir = "/scratch/IITB/ai-at-ieor/23m1521/datasets/Vaani/Images/From-Images-Config" | |
# os.makedirs(save_dir, exist_ok=True) | |
# basedir = "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs/blobs" | |
# image_shards = [os.path.join(basedir, f) for f in os.listdir(basedir)] | |
# downloaded = 0 | |
# pbar1 = tqdm(total=len(image_shards), dynamic_ncols=True, colour='blue') | |
# for shard in image_shards: | |
# df = pd.read_parquet(shard) | |
# pbar2 = tqdm(total=len(df), dynamic_ncols=True, colour='red') | |
# for i in range(len(df)): | |
# img_data = df.image[i]['bytes'] | |
# path = df.image[i]['path'] | |
# # if path in valid_image_names and img_data: | |
# image = Image.open(BytesIO(img_data)) | |
# image.save(os.path.join(save_dir, os.path.basename(path))) | |
# image.close() | |
# del image | |
# downloaded += 1 | |
# pbar2.set_postfix_str(f"Downloaded {downloaded} images") | |
# pbar2.update(1) | |
# pbar2.refresh() | |
# del df | |
# pbar1.update(1) | |
# pbar1.refresh() | |
# # subprocess.run([ | |
# # "rm", "-rf", f"{basedir}/{os.path.basename(shard)}" | |
# # ], check=True) | |
# # subprocess.run([ | |
# # "rm", "-rf", "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs" | |
# # ], check=True) | |
# print(f"\n✅ Finished downloading {downloaded} images.") | |
# ---------------------- | |
import os | |
from PIL import Image | |
from io import BytesIO | |
from tqdm import tqdm | |
from datasets import load_dataset | |
import fireducks.pandas as pd | |
import itertools | |
import string | |
import gc | |
from joblib import Parallel, delayed | |
# Paths | |
csv_path = "/home/IITB/ai-at-ieor/23m1521/ashish/MTP/Vaani/Vaani-Audio-Image-Hindi2.csv" | |
save_dir = "/scratch/IITB/ai-at-ieor/23m1521/datasets/Vaani/Images/From-Images-Config" | |
basedir = "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs/blobs" | |
# Load valid image names | |
valid_image_names = set(pd.read_csv(csv_path)["referenceImage"].dropna().unique()) | |
# Ensure base save directory exists | |
os.makedirs(save_dir, exist_ok=True) | |
# Get all shards | |
image_shards = [os.path.join(basedir, f) for f in os.listdir(basedir)] | |
print(f"Found {len(image_shards)} shards to process.") | |
def process_shard(shard_path): | |
shard_name = os.path.splitext(os.path.basename(shard_path))[0] | |
shard_save_dir = os.path.join(save_dir, shard_name) | |
os.makedirs(shard_save_dir, exist_ok=True) | |
try: | |
df = pd.read_parquet(shard_path) | |
except Exception as e: | |
print(f"⚠️ Failed to read shard: {shard_path}, error: {e}") | |
return 0 | |
count = 0 | |
try: | |
for i in tqdm(range(len(df)), desc=f"[{shard_name}]", dynamic_ncols=True, position=0, leave=False): | |
try: | |
img_data = df.image[i]['bytes'] | |
path = df.image[i]['path'] | |
filename = os.path.basename(path) | |
# if filename in valid_image_names and img_data: | |
image = Image.open(BytesIO(img_data)) | |
image.save(os.path.join(shard_save_dir, filename)) | |
image.close() | |
count += 1 | |
except Exception as e: | |
print(f"⚠️ Error processing image {i} in {shard_name}: {e}") | |
continue | |
finally: | |
del df | |
gc.collect() | |
print(f"✅ Finished {shard_name} with {count} images.") | |
return count | |
if __name__ == "__main__": | |
num_workers = 3 # Adjust based on memory limits | |
results = Parallel(n_jobs=num_workers, backend="loky", verbose=10)( | |
delayed(process_shard)(shard) for shard in image_shards | |
) | |
total_downloaded = sum(results) | |
print(f"\n✅ Finished downloading {total_downloaded} images from {len(image_shards)} shards.") | |
# ---------------------- | |
# import os | |
# from PIL import Image | |
# from io import BytesIO | |
# from tqdm import tqdm | |
# from datasets import load_dataset | |
# import fireducks.pandas as pd | |
# import multiprocessing as mp | |
# import itertools | |
# import string | |
# import gc | |
# # Paths | |
# csv_path = "/home/IITB/ai-at-ieor/23m1521/ashish/MTP/Vaani/Vaani-Audio-Image-Hindi2.csv" | |
# save_dir = "/scratch/IITB/ai-at-ieor/23m1521/datasets/Vaani/Images/From-Images-Config" | |
# basedir = "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs/blobs" | |
# # Load valid image names | |
# valid_image_names = set(pd.read_csv(csv_path)["referenceImage"].dropna().unique()) | |
# # Ensure base save directory exists | |
# os.makedirs(save_dir, exist_ok=True) | |
# # Get all shards | |
# image_shards = [os.path.join(basedir, f) for f in os.listdir(basedir)] | |
# print(f"Found {len(image_shards)} shards to process.") | |
# def process_shard(shard_path): | |
# shard_name = os.path.splitext(os.path.basename(shard_path))[0] | |
# shard_save_dir = os.path.join(save_dir, shard_name) | |
# os.makedirs(shard_save_dir, exist_ok=True) | |
# try: | |
# df = pd.read_parquet(shard_path) | |
# except Exception as e: | |
# print(f"⚠️ Failed to read shard: {shard_path}, error: {e}") | |
# return 0 | |
# count = 0 | |
# for i in tqdm(range(len(df)), desc=f"[{shard_name}]", dynamic_ncols=True): | |
# try: | |
# img_data = df.image[i]['bytes'] | |
# path = df.image[i]['path'] | |
# filename = os.path.basename(path) | |
# # if filename in valid_image_names and img_data: | |
# image = Image.open(BytesIO(img_data)) | |
# image.save(os.path.join(shard_save_dir, filename)) | |
# image.close() | |
# count += 1 | |
# except Exception as e: | |
# print(f"⚠️ Error processing image {i} in {shard_name}: {e}") | |
# continue | |
# del df | |
# gc.collect() | |
# print(f"✅ {shard_name}") | |
# return count | |
# if __name__ == "__main__": | |
# with mp.Pool(processes=3) as pool: | |
# results = list(pool.imap(process_shard, image_shards)) | |
# total_downloaded = sum(results) | |
# print(f"\n✅ Finished downloading {total_downloaded} images.") | |
# ------------------ | |
# import os | |
# import fireducks.pandas as pd | |
# from PIL import Image | |
# from io import BytesIO | |
# from concurrent.futures import ProcessPoolExecutor | |
# from tqdm import tqdm | |
# import hashlib | |
# # Load valid image names | |
# csv_path = "/home/IITB/ai-at-ieor/23m1521/ashish/MTP/Vaani/Vaani-Audio-Image-Hindi2.csv" | |
# valid_image_names = set(pd.read_csv(csv_path)["referenceImage"].dropna().unique()) | |
# # Save base directory | |
# save_dir = "/scratch/IITB/ai-at-ieor/23m1521/datasets/Vaani/Images/From-Images-Config" | |
# os.makedirs(save_dir, exist_ok=True) | |
# # Shards location | |
# basedir = "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs/blobs" | |
# image_shards = [os.path.join(basedir, f) for f in os.listdir(basedir)] | |
# def process_shard(shard_path): | |
# df = pd.read_parquet(shard_path) | |
# shard_name = os.path.splitext(os.path.basename(shard_path))[0] | |
# shard_save_dir = os.path.join(save_dir, shard_name) | |
# os.makedirs(shard_save_dir, exist_ok=True) | |
# count = 0 | |
# for i in range(len(df)): | |
# img_info = df.image[i] | |
# if img_info is None or 'bytes' not in img_info or 'path' not in img_info: | |
# continue | |
# img_data = img_info['bytes'] | |
# img_path = img_info['path'] | |
# # if img_path in valid_image_names and img_data: | |
# try: | |
# image = Image.open(BytesIO(img_data)) | |
# image.save(os.path.join(shard_save_dir, os.path.basename(img_path))) | |
# image.close() | |
# count += 1 | |
# except Exception as e: | |
# continue # skip corrupt images | |
# return shard_path, count | |
# def main(): | |
# total_downloaded = 0 | |
# with ProcessPoolExecutor(max_workers=10) as executor: | |
# results = list(tqdm(executor.map(process_shard, image_shards), total=len(image_shards), desc="Processing shards", dynamic_ncols=True, colour="blue")) | |
# for shard_path, count in results: | |
# total_downloaded += count | |
# print(f"✅ {os.path.basename(shard_path)}: Saved {count} images.") | |
# print(f"\n✅ Finished downloading {total_downloaded} images from {len(image_shards)} shards.") | |
# if __name__ == "__main__": | |
# main() | |