|
import os |
|
import tempfile |
|
|
|
|
|
def _is_writable(path: str) -> bool: |
|
try: |
|
if not os.path.exists(path): |
|
os.makedirs(path, exist_ok=True) |
|
test_fd, test_path = tempfile.mkstemp(prefix='.wtest_', dir=path) |
|
os.close(test_fd) |
|
os.unlink(test_path) |
|
return True |
|
except Exception: |
|
return False |
|
|
|
|
|
def _detect_data_dir() -> str: |
|
|
|
env = os.getenv('DATA_DIR') |
|
if env and _is_writable(env): |
|
return env |
|
|
|
if _is_writable('/data'): |
|
return '/data' |
|
|
|
if _is_writable('/app/data'): |
|
return '/app/data' |
|
|
|
return '/tmp' |
|
|
|
|
|
DATA_DIR = _detect_data_dir() |
|
|
|
|
|
_preferred_logs = os.getenv('LOG_DIR') or os.path.join(DATA_DIR, 'logs') |
|
try: |
|
os.makedirs(_preferred_logs, exist_ok=True) |
|
|
|
if not _is_writable(_preferred_logs): |
|
raise PermissionError("Log dir not writable") |
|
except Exception: |
|
_preferred_logs = '/tmp/logs' |
|
os.makedirs(_preferred_logs, exist_ok=True) |
|
|
|
LOG_DIR = _preferred_logs |
|
|
|
|
|
def _compute_last_run_path(base_dir: str) -> str: |
|
candidates = [ |
|
os.path.join(base_dir, 'deployment', 'last_run.txt'), |
|
os.path.join(base_dir, 'last_run.txt'), |
|
'/tmp/last_run.txt', |
|
] |
|
for p in candidates: |
|
try: |
|
os.makedirs(os.path.dirname(p), exist_ok=True) |
|
|
|
with open(p, 'a'): |
|
pass |
|
return p |
|
except Exception: |
|
continue |
|
return '/tmp/last_run.txt' |
|
|
|
|
|
LAST_RUN_PATH = _compute_last_run_path(DATA_DIR) |
|
|