File size: 6,552 Bytes
c49b21b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
"""
cloud_utils.py – Unified utilities for HTTP fetch and cloud/local storage operations.
Provides:
• fetch_content / fetch_json for HTTP GET
• StorageHandler class with upload/download and fallback to local filesystem
- Methods set self.last_mode to 'cloud' or 'local'
- Local files are stored under a base directory
Usage:
from cloud_utils import StorageHandler, fetch_json
Requirements:
• boto3 and botocore
• requests
• ENV vars for cloud credentials (e.g. FILEBASE_*)
"""
import os
import errno
import requests
import boto3
from botocore.config import Config
from botocore.exceptions import BotoCoreError, ClientError
# HTTP Fetch utilities ---------------------------------------------------------
def fetch_content(url, headers=None, timeout=15):
"""Fetch binary content via HTTP GET."""
resp = requests.get(url, headers=headers, timeout=timeout, stream=False)
resp.raise_for_status()
return resp.content
def fetch_json(url, headers=None, timeout=15):
"""Fetch JSON data via HTTP GET."""
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
data = resp.json()
return data.get("data", data) if isinstance(data, dict) else data
def fetch_text(url, headers=None, timeout=15, encoding='utf-8'):
"""Fetch text content via HTTP GET."""
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
resp.encoding = encoding
return resp.text
# Storage Handler ---------------------------------------------------------------
class StorageHandler:
def list_prefix(self, prefix):
"""List all object keys in the given S3 prefix. Returns a list of keys. Local fallback returns empty list."""
if self.s3 and self.bucket:
paginator = self.s3.get_paginator('list_objects_v2')
keys = []
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
for obj in page.get('Contents', []):
keys.append(obj['Key'])
return keys
# Local fallback: not implemented (could walk local filesystem if needed)
return []
def __init__(self, endpoint_url, access_key, secret_key, bucket_name, local_base="data"):
"""
Initialize cloud storage client and local base path.
endpoint_url: S3-compatible endpoint URL
bucket_name: target bucket name (if None/empty, operate in local-only mode)
local_base: directory prefix for local fallback files
"""
self.bucket = bucket_name
self.local_base = local_base.rstrip(os.sep)
self.last_mode = None # 'cloud' or 'local'
if bucket_name:
# boto3 client config
cfg = Config(signature_version="s3v4", s3={"addressing_style": "path"})
self.s3 = boto3.client(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=cfg,
region_name='us-east-1'
)
else:
self.s3 = None
def _ensure_local_dir(self, key):
path = os.path.join(self.local_base, key)
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
def download(self, key):
"""Download object by key. Returns bytes, sets last_mode. Raises FileNotFoundError if not found."""
if self.s3 and self.bucket:
try:
resp = self.s3.get_object(Bucket=self.bucket, Key=key)
data = resp['Body'].read()
self.last_mode = 'cloud'
print(f"[OK] Downloaded {key} from s3://{self.bucket}/{key}")
return data
except (ClientError, BotoCoreError) as e:
print(f"[WARN] Could not download {key} from S3: {e}")
# Always fallback to local if S3 is not configured or download fails
local_path = self._ensure_local_dir(key)
try:
with open(local_path, 'rb') as f:
data = f.read()
self.last_mode = 'local'
print(f"[FALLBACK] Loaded {key} from local {local_path}")
return data
except FileNotFoundError:
print(f"[ERROR] {key} not found in S3 or locally at {local_path}")
raise
def upload(self, key, data, content_type='application/octet-stream'):
"""Upload bytes to cloud, fallback to local. Sets last_mode. Returns True if cloud, False if local."""
if self.s3 and self.bucket:
try:
self.s3.put_object(Bucket=self.bucket, Key=key, Body=data, ContentType=content_type)
self.last_mode = 'cloud'
print(f"[OK] Uploaded {key} -> s3://{self.bucket}/{key}")
return True
except (ClientError, BotoCoreError) as e:
print(f"[ERROR] Failed uploading {key}: {e}")
# Always fallback to local if S3 is not configured or upload fails
local_path = self._ensure_local_dir(key)
with open(local_path, 'wb') as f:
f.write(data)
self.last_mode = 'local'
print(f"[FALLBACK] Saved {key} locally -> {local_path}")
return False
def exists(self, key):
"""Check for existence of object. Returns True if found in cloud or local."""
if self.s3 and self.bucket:
try:
self.s3.head_object(Bucket=self.bucket, Key=key)
return True
except (ClientError, BotoCoreError):
pass
local_path = os.path.join(self.local_base, key)
return os.path.exists(local_path)
def delete(self, key):
"""Delete object in cloud or local fallback."""
if self.s3 and self.bucket:
try:
self.s3.delete_object(Bucket=self.bucket, Key=key)
self.last_mode = 'cloud'
print(f"[OK] Deleted {key} from s3://{self.bucket}/{key}")
return
except Exception:
pass
local_path = os.path.join(self.local_base, key)
try:
os.remove(local_path)
self.last_mode = 'local'
print(f"[FALLBACK] Deleted {key} locally -> {local_path}")
except OSError as e:
if e.errno != errno.ENOENT:
raise
def get_last_mode(self):
"""Return 'cloud' or 'local' depending on last operation."""
return self.last_mode
# End of cloud_utils.py
|