Spaces:
Paused
Paused
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import time | |
| import pytest | |
| import fsspec | |
| from fsspec.implementations.cached import CachingFileSystem | |
| def m(): | |
| """ | |
| Fixture providing a memory filesystem. | |
| """ | |
| m = fsspec.filesystem("memory") | |
| m.store.clear() | |
| m.pseudo_dirs.clear() | |
| m.pseudo_dirs.append("") | |
| try: | |
| yield m | |
| finally: | |
| m.store.clear() | |
| m.pseudo_dirs.clear() | |
| m.pseudo_dirs.append("") | |
| def ftp_writable(tmpdir): | |
| """ | |
| Fixture providing a writable FTP filesystem. | |
| """ | |
| pytest.importorskip("pyftpdlib") | |
| from fsspec.implementations.ftp import FTPFileSystem | |
| FTPFileSystem.clear_instance_cache() # remove lingering connections | |
| CachingFileSystem.clear_instance_cache() | |
| d = str(tmpdir) | |
| with open(os.path.join(d, "out"), "wb") as f: | |
| f.write(b"hello" * 10000) | |
| P = subprocess.Popen( | |
| [sys.executable, "-m", "pyftpdlib", "-d", d, "-u", "user", "-P", "pass", "-w"] | |
| ) | |
| try: | |
| time.sleep(1) | |
| yield "localhost", 2121, "user", "pass" | |
| finally: | |
| P.terminate() | |
| P.wait() | |
| try: | |
| shutil.rmtree(tmpdir) | |
| except Exception: | |
| pass | |