# import os # # import pandas as pd # import fireducks.pandas as pd # from PIL import Image # from io import BytesIO # from tqdm import tqdm # from datasets import load_dataset # import subprocess # # Load image name list from CSV # csv_path = "/home/IITB/ai-at-ieor/23m1521/ashish/MTP/Vaani/Vaani-Audio-Image-Hindi2.csv" # valid_image_names = set(pd.read_csv(csv_path)["referenceImage"].dropna().unique()) # # Create directory to save images # save_dir = "/scratch/IITB/ai-at-ieor/23m1521/datasets/Vaani/Images/From-Images-Config" # os.makedirs(save_dir, exist_ok=True) # basedir = "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs/blobs" # image_shards = [os.path.join(basedir, f) for f in os.listdir(basedir)] # downloaded = 0 # pbar1 = tqdm(total=len(image_shards), dynamic_ncols=True, colour='blue') # for shard in image_shards: # df = pd.read_parquet(shard) # pbar2 = tqdm(total=len(df), dynamic_ncols=True, colour='red') # for i in range(len(df)): # img_data = df.image[i]['bytes'] # path = df.image[i]['path'] # # if path in valid_image_names and img_data: # image = Image.open(BytesIO(img_data)) # image.save(os.path.join(save_dir, os.path.basename(path))) # image.close() # del image # downloaded += 1 # pbar2.set_postfix_str(f"Downloaded {downloaded} images") # pbar2.update(1) # pbar2.refresh() # del df # pbar1.update(1) # pbar1.refresh() # # subprocess.run([ # # "rm", "-rf", f"{basedir}/{os.path.basename(shard)}" # # ], check=True) # # subprocess.run([ # # "rm", "-rf", "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs" # # ], check=True) # print(f"\n✅ Finished downloading {downloaded} images.") # ---------------------- import os from PIL import Image from io import BytesIO from tqdm import tqdm from datasets import load_dataset import fireducks.pandas as pd import itertools import string import gc from joblib import Parallel, delayed # Paths csv_path = "/home/IITB/ai-at-ieor/23m1521/ashish/MTP/Vaani/Vaani-Audio-Image-Hindi2.csv" save_dir = "/scratch/IITB/ai-at-ieor/23m1521/datasets/Vaani/Images/From-Images-Config" basedir = "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs/blobs" # Load valid image names valid_image_names = set(pd.read_csv(csv_path)["referenceImage"].dropna().unique()) # Ensure base save directory exists os.makedirs(save_dir, exist_ok=True) # Get all shards image_shards = [os.path.join(basedir, f) for f in os.listdir(basedir)] print(f"Found {len(image_shards)} shards to process.") def process_shard(shard_path): shard_name = os.path.splitext(os.path.basename(shard_path))[0] shard_save_dir = os.path.join(save_dir, shard_name) os.makedirs(shard_save_dir, exist_ok=True) try: df = pd.read_parquet(shard_path) except Exception as e: print(f"⚠️ Failed to read shard: {shard_path}, error: {e}") return 0 count = 0 try: for i in tqdm(range(len(df)), desc=f"[{shard_name}]", dynamic_ncols=True, position=0, leave=False): try: img_data = df.image[i]['bytes'] path = df.image[i]['path'] filename = os.path.basename(path) # if filename in valid_image_names and img_data: image = Image.open(BytesIO(img_data)) image.save(os.path.join(shard_save_dir, filename)) image.close() count += 1 except Exception as e: print(f"⚠️ Error processing image {i} in {shard_name}: {e}") continue finally: del df gc.collect() print(f"✅ Finished {shard_name} with {count} images.") return count if __name__ == "__main__": num_workers = 3 # Adjust based on memory limits results = Parallel(n_jobs=num_workers, backend="loky", verbose=10)( delayed(process_shard)(shard) for shard in image_shards ) total_downloaded = sum(results) print(f"\n✅ Finished downloading {total_downloaded} images from {len(image_shards)} shards.") # ---------------------- # import os # from PIL import Image # from io import BytesIO # from tqdm import tqdm # from datasets import load_dataset # import fireducks.pandas as pd # import multiprocessing as mp # import itertools # import string # import gc # # Paths # csv_path = "/home/IITB/ai-at-ieor/23m1521/ashish/MTP/Vaani/Vaani-Audio-Image-Hindi2.csv" # save_dir = "/scratch/IITB/ai-at-ieor/23m1521/datasets/Vaani/Images/From-Images-Config" # basedir = "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs/blobs" # # Load valid image names # valid_image_names = set(pd.read_csv(csv_path)["referenceImage"].dropna().unique()) # # Ensure base save directory exists # os.makedirs(save_dir, exist_ok=True) # # Get all shards # image_shards = [os.path.join(basedir, f) for f in os.listdir(basedir)] # print(f"Found {len(image_shards)} shards to process.") # def process_shard(shard_path): # shard_name = os.path.splitext(os.path.basename(shard_path))[0] # shard_save_dir = os.path.join(save_dir, shard_name) # os.makedirs(shard_save_dir, exist_ok=True) # try: # df = pd.read_parquet(shard_path) # except Exception as e: # print(f"⚠️ Failed to read shard: {shard_path}, error: {e}") # return 0 # count = 0 # for i in tqdm(range(len(df)), desc=f"[{shard_name}]", dynamic_ncols=True): # try: # img_data = df.image[i]['bytes'] # path = df.image[i]['path'] # filename = os.path.basename(path) # # if filename in valid_image_names and img_data: # image = Image.open(BytesIO(img_data)) # image.save(os.path.join(shard_save_dir, filename)) # image.close() # count += 1 # except Exception as e: # print(f"⚠️ Error processing image {i} in {shard_name}: {e}") # continue # del df # gc.collect() # print(f"✅ {shard_name}") # return count # if __name__ == "__main__": # with mp.Pool(processes=3) as pool: # results = list(pool.imap(process_shard, image_shards)) # total_downloaded = sum(results) # print(f"\n✅ Finished downloading {total_downloaded} images.") # ------------------ # import os # import fireducks.pandas as pd # from PIL import Image # from io import BytesIO # from concurrent.futures import ProcessPoolExecutor # from tqdm import tqdm # import hashlib # # Load valid image names # csv_path = "/home/IITB/ai-at-ieor/23m1521/ashish/MTP/Vaani/Vaani-Audio-Image-Hindi2.csv" # valid_image_names = set(pd.read_csv(csv_path)["referenceImage"].dropna().unique()) # # Save base directory # save_dir = "/scratch/IITB/ai-at-ieor/23m1521/datasets/Vaani/Images/From-Images-Config" # os.makedirs(save_dir, exist_ok=True) # # Shards location # basedir = "/scratch/IITB/ai-at-ieor/23m1521/hf_cache/hub/vaani_image_blobs/blobs" # image_shards = [os.path.join(basedir, f) for f in os.listdir(basedir)] # def process_shard(shard_path): # df = pd.read_parquet(shard_path) # shard_name = os.path.splitext(os.path.basename(shard_path))[0] # shard_save_dir = os.path.join(save_dir, shard_name) # os.makedirs(shard_save_dir, exist_ok=True) # count = 0 # for i in range(len(df)): # img_info = df.image[i] # if img_info is None or 'bytes' not in img_info or 'path' not in img_info: # continue # img_data = img_info['bytes'] # img_path = img_info['path'] # # if img_path in valid_image_names and img_data: # try: # image = Image.open(BytesIO(img_data)) # image.save(os.path.join(shard_save_dir, os.path.basename(img_path))) # image.close() # count += 1 # except Exception as e: # continue # skip corrupt images # return shard_path, count # def main(): # total_downloaded = 0 # with ProcessPoolExecutor(max_workers=10) as executor: # results = list(tqdm(executor.map(process_shard, image_shards), total=len(image_shards), desc="Processing shards", dynamic_ncols=True, colour="blue")) # for shard_path, count in results: # total_downloaded += count # print(f"✅ {os.path.basename(shard_path)}: Saved {count} images.") # print(f"\n✅ Finished downloading {total_downloaded} images from {len(image_shards)} shards.") # if __name__ == "__main__": # main()