Source code for tradingstrategy.candle

"""Tradind OHLCV candle data and manipulation.

See

- :py:class:`Candle` for information about :term:`OHLCV` data presentation

- :py:meth:`tradingstrategy.client.Client.fetch_candle_universe` how to load OHLCV dataset

You are likely to working with candle datasets that are presented by

- :py:class:`GroupedCandleUniverse`

For more information about candles see :term:`candle` in glossary.
"""

import datetime
from dataclasses import dataclass
from typing import List, Optional, Tuple, TypedDict, Collection, Iterable

import pandas as pd
import pyarrow as pa
from dataclasses_json import dataclass_json

from tradingstrategy.chain import ChainId
from tradingstrategy.types import UNIXTimestamp, USDollarAmount, BlockNumber, PrimaryKey, NonChecksummedAddress, \
    RawChainId
from tradingstrategy.utils.groupeduniverse import PairGroupedUniverse


# Preconstructed pd.Tiemdelta for optimisation
_ZERO_TIMEDELTA = pd.Timedelta(0)


class CandleSampleUnavailable(Exception):
    """We tried to look up price for a trading pair, but count not find a candle close to the timestamp."""


[docs]@dataclass_json @dataclass class Candle: """Data structure presenting one OHLCV trading candle. Based on the :term:`open-high-low-close-volume <OHLCV>` concept. Trading Strategy candles come with additional information available on the top of core OHLCV, as chain analysis has deeper visibility than one would get on traditional exchanges. For example for enhanced attributes see :py:attr:`Candle.buys` (buy count) or :py:attr:`Candle.start_block` (blockchain starting block number of the candle). We also separate "buys" and "sells". Although this separation might not be meaningful on order-book based exchanges, we define "buy" as a DEX swap where quote token (USD, ETH) was swapped into more exotic token (AAVE, SUSHI, etc.) """ #: Primary key to identity the trading pair #: Use pair universe to map this to chain id and a smart contract address pair_id: PrimaryKey #: Open timestamp for this candle. #: Note that the close timestamp you need to supply yourself based on the context. timestamp: UNIXTimestamp # UNIX timestamp as seconds #: USD exchange rate of the quote token used to #: convert to dollar amounts in this candle. #: #: Note that currently any USD stablecoin (USDC, DAI) is #: assumed to be 1:1 and the candle server cannot #: handle exchange rate difference among stablecoins. #: #: The rate is taken at the beginning of the 1 minute time bucket. #: For other time buckets, the exchange rate is the simple average #: for the duration of the bucket. exchange_rate: float #: OHLC core data open: USDollarAmount #: OHLC core data close: USDollarAmount #: OHLC core data high: USDollarAmount #: OHLC core data low: USDollarAmount #: Number of buys happened during the candle period. #: #: Only avaiable on DEXes where buys and sells can be separaed. buys: int | None #: Number of sells happened during the candle period #: #: Only avaiable on DEXes where buys and sells can be separaed. sells: int | None #: Trade volume volume: USDollarAmount #: Buy side volume #: #: Swap quote token -> base token volume buy_volume: USDollarAmount | None #: Sell side volume #: #: Swap base token -> quote token volume sell_volume: USDollarAmount | None #: Average trade size avg: USDollarAmount #: The first blockchain block that includes trades that went into this candle. start_block: BlockNumber #: The last blockchain block that includes trades that went into this candle. end_block: BlockNumber #: TODO: Currently disabled to optimise speed #: #: This candle contained bad wicked :py:attr:`high` or :py:attr:`low` data and was filtered out. #: #: See :py:func:`tradingstrategy.utils.groupeduniverse.filter_bad_high_low`. #: These might be natural causes for the bad data. However, #: we do not want to deal with these situations inside a trading strategy. #: Thus, we fix candles with unrealisitc high and low wicks during the #: data loading. #: #: Not set unless the filter has been run on the fetched data. # wick_filtered: Optional[bool] = None, #: Schema definition for :py:class:`pd.DataFrame: #: #: Defines Pandas datatypes for columns in our candle data format. #: Useful e.g. when we are manipulating JSON/hand-written data. #: DATAFRAME_FIELDS = dict([ ("pair_id", "int"), ("timestamp", "datetime64[s]"), ("exchange_rate", "float"), ("open", "float"), ("close", "float"), ("high", "float"), ("low", "float"), ("buys", "float"), ("sells", "float"), ("volume", "float"), ("buy_volume", "float"), ("sell_volume", "float"), ("avg", "float"), ("start_block", "int"), ("end_block", "int"), ]) def __repr__(self): human_timestamp = datetime.datetime.utcfromtimestamp(self.timestamp) return f"@{human_timestamp} O:{self.open} H:{self.high} L:{self.low} C:{self.close} V:{self.volume} B:{self.buys} S:{self.sells} SB:{self.start_block} EB:{self.end_block}" @property def trades(self) -> int: """Amount of all trades during the candle period.""" return self.buys + self.sells
[docs] @classmethod def to_dataframe(cls) -> pd.DataFrame: """Return emptry Pandas dataframe presenting candle data.""" df = pd.DataFrame(columns=Candle.DATAFRAME_FIELDS.keys()) return df.astype(Candle.DATAFRAME_FIELDS)
[docs] @classmethod def to_qstrader_dataframe(cls) -> pd.DataFrame: """Return emptry Pandas dataframe presenting candle data for QStrader. TODO: Fix QSTrader to use "standard" column names. """ fields = dict([ ("pair_id", "int"), ("Date", "datetime64[s]"), ("exchange_rate", "float"), ("Open", "float"), ("Close", "float"), ("High", "float"), ("Low", "float"), ("buys", "float"), ("sells", "float"), ("volume", "float"), ("buy_volume", "float"), ("sell_volume", "float"), ("avg", "float"), ("start_block", "float"), ("end_block", "float"), ]) df = pd.DataFrame(columns=fields.keys()) return df.astype(fields)
[docs] @classmethod def to_pyarrow_schema(cls, small_candles=False) -> pa.Schema: """Construct schema for writing Parquet filess for these candles. :param small_candles: Use even smaller word sizes for frequent (1m) candles. """ schema = pa.schema([ ("pair_id", pa.uint32()), ("timestamp", pa.timestamp("s")), ("exchange_rate", pa.float32()), ("open", pa.float32()), ("close", pa.float32()), ("high", pa.float32()), ("low", pa.float32()), ("buys", pa.uint16() if small_candles else pa.uint32()), ("sells", pa.uint16() if small_candles else pa.uint32()), ("volume", pa.float32()), ("buy_volume", pa.float32()), ("sell_volume", pa.float32()), ("avg", pa.float32()), ("start_block", pa.uint32()), # Should we good for 4B blocks ("end_block", pa.uint32()), ]) return schema
[docs] @staticmethod def generate_synthetic_sample( pair_id: int, timestamp: pd.Timestamp, price: float) -> dict: """Generate a candle dataframe. Used in testing when manually fiddled data is needed. All open/close/high/low set to the same price. Exchange rate is 1.0. Other data set to zero. :return: One dict of filled candle data """ return { "pair_id": pair_id, "timestamp": timestamp, "open": price, "high": price, "low": price, "close": price, "exchange_rate": 1.0, "buys": 0, "sells": 0, "avg": 0, "start_block": 0, "end_block": 0, "volume": 0, "buy_volume": 0, "sell_volume": 0, }
[docs]@dataclass_json @dataclass class CandleResult: """Server-reply for live queried candle data. Uses `dataclasses-json` module for JSON serialisation. """ #: A bunch of candles. #: Candles are unordered and subject to client side sorting. #: Multiple pairs and chains may be present in candles. candles: List[Candle]
[docs] def sort_by_timestamp(self): """In-place sorting of candles by their timestamp.""" self.candles.sort(key=lambda c: c.timestamp)
[docs]class GroupedCandleUniverse(PairGroupedUniverse): """A candle universe where each trading pair has its own candles. This is helper class to create foundation for multi pair strategies. For the data logistics purposes, all candles are lumped together in single columnar data blobs. However, it rarely makes sense to execute operations over different trading pairs. :py:class`GroupedCandleUniverse` creates trading pair id -> candle data grouping out from raw candle data. Usage: .. code-block:: # Get candles for SUSHI-USDT exchange_universe = client.fetch_exchange_universe() raw_pairs = client.fetch_pair_universe().to_pandas() raw_candles = client.fetch_all_candles(TimeBucket.d7).to_pandas() pair_universe = PandasPairUniverse(raw_pairs) candle_universe = GroupedCandleUniverse(raw_candles) # Do some test calculations for a single pair sushi_swap = exchange_universe.get_by_chain_and_name(ChainId.ethereum, "sushi") sushi_usdt = pair_universe.get_one_pair_from_pandas_universe(sushi_swap.exchange_id, "SUSHI", "USDT") raw_candles = client.fetch_all_candles(TimeBucket.d7).to_pandas() candle_universe = GroupedCandleUniverse(raw_candles) sushi_usdth_candles = candle_universe.get_candles_by_pair(sushi_usdt.pair_id) """
[docs] def get_candle_count(self) -> int: """Return the dataset size - how many candles total""" return self.get_sample_count()
[docs] def get_candles_by_pair(self, pair_id: PrimaryKey) -> Optional[pd.DataFrame]: """Get candles for a single pair. :return: Pandas dataframe object with the following columns - timestamp - open - high - low - close """ if pair_id not in self.candles_cache: self.candles_cache[pair_id] = self.get_samples_by_pair(pair_id) return self.candles_cache[pair_id]
[docs] def get_closest_price(self, pair_id: PrimaryKey, when: pd.Timestamp, kind="close", look_back_time_frames=5) -> USDollarAmount: """Get the available liquidity for a trading pair at a specific timepoint or some candles before the timepoint. The liquidity is defined as one-sided as in :term:`XY liquidity model`. :param pair_id: Trading pair id :param when: Timestamp to query :param kind: One of OHLC data points: "open", "close", "low", "high" :param look_back_timeframes: If there is no liquidity sample available at the exact timepoint, look to the past to the get the nearest sample. For example if candle time interval is 5 minutes and look_back_timeframes is 10, then accept a candle that is maximum of 50 minutes before the timepoint. :return: We always return a price. In the error cases an exception is raised. :raise CandleSampleUnavailable: There was no samples available with the given condition. """ assert kind in ("open", "close", "high", "low"), f"Got kind: {kind}" start_when = when samples_per_pair = self.get_candles_by_pair(pair_id) assert samples_per_pair is not None, f"No candle data available for pair {pair_id}" samples_per_kind = samples_per_pair[kind] for attempt in range(look_back_time_frames): try: sample = samples_per_kind[when] return sample except KeyError: # Go to the previous sample when -= self.time_bucket.to_timedelta() # Try to be helpful with the errors here, # so one does not need to open ipdb to inspect faulty data try: first_sample = samples_per_pair.iloc[0] second_sample = samples_per_pair.iloc[1] last_sample = samples_per_pair.iloc[-1] except KeyError: raise CandleSampleUnavailable( f"Could not find any candles for pair {pair_id}, value kind '{kind}', between {when} - {start_when}\n" f"Could not figure out existing data range. Has {len(samples_per_kind)} samples." ) raise CandleSampleUnavailable( f"Could not find any candles for pair {pair_id}, value kind '{kind}', between {when} - {start_when}\n" f"The pair has {len(samples_per_kind)} candles between {first_sample['timestamp']} - {last_sample['timestamp']}\n" f"Sample interval is {second_sample['timestamp'] - first_sample['timestamp']}" )
[docs] def get_price_with_tolerance(self, pair_id: PrimaryKey, when: pd.Timestamp, tolerance: pd.Timedelta, kind="close") -> Tuple[USDollarAmount, pd.Timedelta]: """Get the available liquidity for a trading pair at a specific timepoint or some candles before the timepoint. The liquidity is defined as one-sided as in :term:`XY liquidity model`. Example: .. code-block:: python test_price, distance = universe.get_price_with_tolerance( pair_id=1, when=pd.Timestamp("2020-02-01 00:05"), tolerance=pd.Timedelta(30, "m")) # Returns closing price of the candle 2020-02-01 00:00, # which is 5 minutes off when we asked assert test_price == pytest.approx(100.50) assert distance == pd.Timedelta("5m") :param pair_id: Trading pair id :param when: Timestamp to query :param kind: One of OHLC data points: "open", "close", "low", "high" :param tolerance: If there is no liquidity sample available at the exact timepoint, look to the past to the get the nearest sample. For example if candle time interval is 5 minutes and look_back_timeframes is 10, then accept a candle that is maximum of 50 minutes before the timepoint. :return: Return (price, delay) tuple. We always return a price. In the error cases an exception is raised. The delay is the timedelta between the wanted timestamp and the actual timestamp of the candle. Candles are always timestamped by their opening. :raise CandleSampleUnavailable: There were no samples available with the given condition. """ assert kind in ("open", "close", "high", "low"), f"Got kind: {kind}" last_allowed_timestamp = when - tolerance candles_per_pair = self.get_candles_by_pair(pair_id) assert candles_per_pair is not None, f"No candle data available for pair {pair_id}" samples_per_kind = candles_per_pair[kind] # Look up all the candles before the cut off timestamp. # Assumes data is sorted by timestamp column, # so our "closest time" candles should be the last of this lookup. # TODO: self.timestamp_column is no longer needed after we drop QSTrader support, # it is legacy. In the future use hardcoded "timestamp" column name. timestamp_column = candles_per_pair[self.timestamp_column] latest_or_equal_sample = candles_per_pair.loc[timestamp_column <= when].iloc[-1] # Check if the last sample before the cut off is within time range our tolerance candle_timestamp = latest_or_equal_sample[self.timestamp_column] distance = when - candle_timestamp assert distance >= _ZERO_TIMEDELTA, f"Somehow we managed to get a candle timestamp {candle_timestamp} that is newer than asked {when}" if candle_timestamp >= last_allowed_timestamp: # Return the chosen price column of the sample return latest_or_equal_sample[kind], distance # Try to be helpful with the errors here, # so one does not need to open ipdb to inspect faulty data try: first_sample = candles_per_pair.iloc[0] second_sample = candles_per_pair.iloc[1] last_sample = candles_per_pair.iloc[-1] except KeyError: raise CandleSampleUnavailable( f"Could not find any candles for pair {pair_id}, value kind '{kind}', between {when} - {last_allowed_timestamp}\n" f"Could not figure out existing data range. Has {len(samples_per_kind)} samples." ) raise CandleSampleUnavailable( f"Could not find any candles for pair {pair_id}, value kind '{kind}', between {when} - {last_allowed_timestamp}\n" f"The pair has {len(samples_per_kind)} candles between {first_sample['timestamp']} - {last_sample['timestamp']}\n" f"Sample interval is {second_sample['timestamp'] - first_sample['timestamp']}\n" f"\n" f"This is usually due to sparse candle data - trades have not been made or the blockchain was halted during the price look-up period.\n" f"Try to increase look back perid in your code." )
[docs] @staticmethod def create_empty() -> "GroupedCandleUniverse": """Return an empty GroupedCandleUniverse""" return GroupedCandleUniverse(df=Candle.to_dataframe(), fix_wick_threshold=None)
[docs] @staticmethod def create_empty_qstrader() -> "GroupedCandleUniverse": """Return an empty GroupedCandleUniverse. TODO: Fix QSTrader to use "standard" column names. """ return GroupedCandleUniverse(df=Candle.to_qstrader_dataframe(), timestamp_column="Date", fix_wick_threshold=None)
[docs] @staticmethod def create_from_single_pair_dataframe(df: pd.DataFrame) -> "GroupedCandleUniverse": """Construct universe based on a single trading pair data. Useful for synthetic data/testing. """ return GroupedCandleUniverse(df)
[docs] @staticmethod def create_from_multiple_candle_datafarames(dfs: Iterable[pd.DataFrame]) -> "GroupedCandleUniverse": """Construct universe based on multiple trading pairs. Useful for synthetic data/testing. :param dfs: List of dataframes/series where each trading pair is as isolated OHLCV data feed. """ merged = pd.concat(dfs) return GroupedCandleUniverse(merged)
[docs]class TradingPairDataAvailability(TypedDict): """Trading data avaioilability description for a single pair. - Trading Strategy oracle uses sparse data format where candles with zero trades are not generated. This is better suited for illiquid DEX markets with few trades. - Because of sparse data format, we do not know if there is a last candle available - candle may not be available yet or there might not be trades to generate a candle - This information is always time frame (15m, 1h, 1d) specific - See :py:meth:`tradingstrategy.client.Client.fetch_trading_data_availability` """ #: Blockchain of the pair chain_id: ChainId #: Address of the pair pair_address: NonChecksummedAddress #: Internal id of the pair pair_id: PrimaryKey #: What is the last full available candle for this trading pair last_candle_at: datetime.datetime #: What is the last trade oracle has seen for this trading pair. #: #: This trade might not be rolled up to a candle yet. last_trade_at: datetime.datetime
[docs]def is_candle_green(candle: pd.Series) -> bool: """Check if an OHLCV candle is green. A :term:`OHLCV` candle is green if close is higher than open. Example: .. code-block:: python candle = indexed_candles.loc[pd.Timestamp("2022-02-14")] assert not is_candle_green(candle) assert is_candle_red(candle) """ assert isinstance(candle, pd.Series), f"Got: {candle.__class__}" return candle["close"] >= candle["open"]
[docs]def is_candle_red(candle: pd.Series) -> bool: """Check if an OHLCV candle is green. A :term:`OHLCV` candle is green if close is higher than open. Example: .. code-block:: python candle = indexed_candles.loc[pd.Timestamp("2022-02-14")] assert not is_candle_green(candle) assert is_candle_red(candle) """ assert isinstance(candle, pd.Series), f"Got: {candle.__class__}" return not is_candle_green(candle)