Source code for tradingstrategy.transport.base
import datetime
import io
from abc import ABC, abstractmethod
from typing import List
from tradingstrategy.caip import ChainAddressTuple
from tradingstrategy.timebucket import TimeBucket
[docs]class BaseTransport(ABC):
"""Define transport interface.
Different transports can be used to get the candle data from oracle, depending on
the execution context of the Python code.
"""
@abstractmethod
def fetch_stats(self) -> dict:
pass
[docs] @abstractmethod
def fetch_pair_universe(self) -> io.BytesIO:
"""Get the latest info on trading pairs.
:return: A reader for JSON and ZSTD serialised PairUniverse
"""
[docs] @abstractmethod
def fetch_live_candles(self, pair_list: List[ChainAddressTuple], start: datetime.datetime, end: datetime.datetime):
"""Downlaod real-time partial candle data."""
[docs] @abstractmethod
def fetch_candle_dataset(self, bucket: TimeBucket):
"""Download cached precompiled data set.
Datasets are anywhere between 80 MB - 4 GB.
"""