Source code for tradingstrategy.transport.cache_utils

import logging
import os
from contextlib import contextmanager
from pathlib import Path

from filelock import FileLock

logger = logging.getLogger(__name__)


[docs]@contextmanager def wait_other_writers(path: Path | str, timeout: int = 120): """Wait other potential writers writing the same file. - Work around issues when parallel unit tests and such try to write the same file Example: .. code-block:: python import urllib import tempfile import pytest import pandas as pd @pytest.fixture() def my_cached_test_data_frame() -> pd.DataFrame: # Al tests use a cached dataset stored in the /tmp directory path = os.path.join(tempfile.gettempdir(), "my_shared_data.parquet") with wait_other_writers(path): # Read result from the previous writer if not path.exists(path): # Download and write to cache urllib.request.urlretrieve("https://example.com", path) return pd.read_parquet(path) :param path: File that is being written :param timeout: How many seconds wait to acquire the lock file. Default 2 minutes. :raise filelock.Timeout: If the file writer is stuck with the lock. """ if isinstance(path, str): path = Path(path) assert isinstance(path, Path), f"Not Path object: {path}" assert path.is_absolute(), f"Did not get an absolute path: {path}\n" \ f"Please use absolute paths for lock files to prevent polluting the local working directory." # If we are writing to a new temp folder, create any parent paths os.makedirs(path.parent, exist_ok=True) # https://stackoverflow.com/a/60281933/315168 lock_file = path.parent / (path.name + '.lock') lock = FileLock(lock_file, timeout=timeout) if lock.is_locked: logger.info( "Parquet file %s locked for writing, waiting %f seconds", path, timeout, ) with lock: yield