File size: 7,559 Bytes
c49b21b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import os
import sys
import argparse
from dotenv import load_dotenv
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.data_cloud.cloud_utils import StorageHandler
def choose_base_dir(cli_base=None):
"""Choose a writable base directory. Preference order:
1. CLI-provided path
2. /data (persistent volume on Spaces)
3. /tmp
"""
candidates = []
if cli_base:
candidates.append(cli_base)
candidates.extend(['/data', '/tmp'])
for base in candidates:
try:
merged_path = os.path.abspath(os.path.join(base, 'merged'))
advisorai_path = os.path.abspath(os.path.join(base, 'advisorai-data'))
os.makedirs(merged_path, mode=0o777, exist_ok=True)
os.makedirs(advisorai_path, mode=0o777, exist_ok=True)
# Quick writability test
test_file = os.path.join(merged_path, '.write_test')
with open(test_file, 'w') as f:
f.write('ok')
os.remove(test_file)
return base
except Exception:
# cannot use this candidate; try next
continue
# As a last resort, use /tmp (may raise later if not writable)
return '/tmp'
def main(argv=None):
parser = argparse.ArgumentParser(description='Fetch data from Filebase/S3 into local disk')
parser.add_argument('--base-dir', help='Base directory to store data (default: auto-detected)')
args = parser.parse_args(argv)
load_dotenv()
# Load credentials from environment variables
endpoint_url = os.getenv('FILEBASE_ENDPOINT', 'https://s3.filebase.com')
access_key = os.getenv('FILEBASE_ACCESS_KEY')
secret_key = os.getenv('FILEBASE_SECRET_KEY')
bucket_name = os.getenv('FILEBASE_BUCKET')
# Prefer explicit DATA_DIR env var if present (Option 1)
env_base = os.getenv('DATA_DIR')
if env_base:
base_root = env_base
else:
base_root = choose_base_dir(args.base_dir)
local_base = os.path.abspath(os.path.join(base_root, 'merged'))
advisorai_base = os.path.abspath(os.path.join(base_root, 'advisorai-data'))
# Ensure base directories exist with proper permissions
os.makedirs(local_base, mode=0o777, exist_ok=True)
os.makedirs(advisorai_base, mode=0o777, exist_ok=True)
storage = StorageHandler(endpoint_url, access_key, secret_key, bucket_name, local_base=local_base)
# Fetch all folders/files from advisorai-data
advisor_prefix = "advisorai-data/"
print(f"Fetching all folders/files from: {advisor_prefix}")
advisor_keys = []
if storage.s3 and bucket_name:
try:
resp = storage.s3.list_objects_v2(Bucket=bucket_name, Prefix=advisor_prefix)
for obj in resp.get('Contents', []):
key = obj['Key']
if not key.endswith('/'):
advisor_keys.append(key)
except Exception as e:
print(f"[WARN] Could not list objects for {advisor_prefix}: {e}")
else:
print(f"[ERROR] No S3 client or bucket configured for advisorai-data!")
# Download advisorai-data files
for key in advisor_keys:
try:
data = storage.download(key)
# Remove 'advisorai-data/' from the start of the key for local path
local_rel_path = key[len("advisorai-data/"):] if key.startswith("advisorai-data/") else key
local_path = os.path.join(advisorai_base, local_rel_path)
os.makedirs(os.path.dirname(local_path), mode=0o777, exist_ok=True)
with open(local_path, 'wb') as f:
f.write(data)
print(f"[OK] Downloaded advisorai-data/{local_rel_path} from s3://{bucket_name}/{key}")
except Exception as e:
print(f"[ERROR] Failed to fetch advisorai-data file {key}: {e}")
# Fetch everything under merged/ except only the last 7 from merged/archive/
merged_prefix = "merged/"
print(f"Fetching everything under: {merged_prefix} (except only last 7 from archive)")
merged_keys = []
archive_prefix = "merged/archive/"
archive_folders = set()
archive_keys = []
if storage.s3 and bucket_name:
try:
resp = storage.s3.list_objects_v2(Bucket=bucket_name, Prefix=merged_prefix)
for obj in resp.get('Contents', []):
key = obj['Key']
# Exclude all archive keys for now
if key.startswith(archive_prefix):
# Collect archive folders for later
parts = key[len(archive_prefix):].split('/')
if len(parts) > 1 and parts[0].isdigit():
archive_folders.add(parts[0])
continue
if not key.endswith('/'):
merged_keys.append(key)
except Exception as e:
print(f"[WARN] Could not list objects for {merged_prefix}: {e}")
else:
print(f"[ERROR] No S3 client or bucket configured for merged!")
# Download all merged/ (except archive)
for key in merged_keys:
try:
data = storage.download(key)
local_rel_path = key[len("merged/"):] if key.startswith("merged/") else key
local_path = os.path.join(local_base, local_rel_path)
os.makedirs(os.path.dirname(local_path), mode=0o777, exist_ok=True)
with open(local_path, 'wb') as f:
f.write(data)
print(f"[OK] Downloaded {key} from s3://{bucket_name}/{key}")
except Exception as e:
print(f"[ERROR] Failed to fetch {key}: {e}")
# Fetch only the last 7 folders under merged/archive
archive_prefix = "merged/archive/"
print(f"Fetching last 7 archive folders from: {archive_prefix}")
archive_folders = set()
archive_keys = []
if storage.s3 and bucket_name:
try:
resp = storage.s3.list_objects_v2(Bucket=bucket_name, Prefix=archive_prefix)
for obj in resp.get('Contents', []):
key = obj['Key']
# Expect keys like merged/archive/YYYYMMDD/...
parts = key[len(archive_prefix):].split('/')
if len(parts) > 1 and parts[0].isdigit():
archive_folders.add(parts[0])
# Sort and get last 7 folders
last7 = sorted(archive_folders)[-7:]
print(f"[INFO] Last 7 archive folders: {last7}")
# Collect all keys in those folders
for obj in resp.get('Contents', []):
key = obj['Key']
parts = key[len(archive_prefix):].split('/')
if len(parts) > 1 and parts[0] in last7:
archive_keys.append(key)
except Exception as e:
print(f"[WARN] Could not list objects for {archive_prefix}: {e}")
else:
print(f"[ERROR] No S3 client or bucket configured for archive!")
# Download archive files
for key in archive_keys:
try:
data = storage.download(key)
local_rel_path = key[len("merged/"):] if key.startswith("merged/") else key
local_path = os.path.join(local_base, local_rel_path)
os.makedirs(os.path.dirname(local_path), mode=0o777, exist_ok=True)
with open(local_path, 'wb') as f:
f.write(data)
print(f"[OK] Downloaded {key} from s3://{bucket_name}/{key}")
except Exception as e:
print(f"[ERROR] Failed to fetch archive file {key}: {e}")
if __name__ == "__main__":
main()
|