Source code for tradeexecutor.strategy.pandas_trader.strategy_input
"""Strategy decision input.
- Input arguments for `decide_trade` functions
"""
import logging
from dataclasses import dataclass
from functools import lru_cache
import cachetools
import pandas as pd
from tradeexecutor.state.identifier import TradingPairIdentifier
from tradeexecutor.state.state import State
from tradeexecutor.state.types import USDollarPrice
from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorResultMap, IndicatorSet, IndicatorKey, IndicatorNotFound, InvalidForMultipairStrategy
from tradeexecutor.strategy.pandas_trader.position_manager import PositionManager
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.pricing_model import PricingModel
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradingstrategy.candle import CandleSampleUnavailable
from tradingstrategy.liquidity import LiquidityDataUnavailable
from tradingstrategy.pair import HumanReadableTradingPairDescription
from tradingstrategy.utils.time import get_prior_timestamp, ZERO_TIMEDELTA
logger = logging.getLogger(__name__)
SERIES_CACHE_SIZE = 1024
class IndicatorDataNotFoundWithinDataTolerance(Exception):
"""We try to get forward-filled data, but there is no data within our tolerance."""
[docs]@dataclass(slots=True)
class StrategyInputIndicators:
"""Indicator results for the strategy decision.
A helper class to read and manipulate indicator and price values.
Thi class wraps the indicator results, both cached and real-time, to a format that has good developer experience
when accessed from `decide_trades()`.
- Indicators are prepared in `create_indicators` function
- The framework takes care of recalculating indicators when needed,
for backtest and live access
- For backtests, this class is instiated only once
- We assume all indicator data is forward-filled and no gaps
How to use
- For simple strategies calling :py:meth:`get_indicator_value` should be only required here.
"""
#: Trading universe
#:
#: - Perform additional pair lookups if needed
#:
strategy_universe: TradingStrategyUniverse
#: Available indicators as defined in create_indicators()
#:
available_indicators: IndicatorSet
#: Raw cached indicator results or ones calculated in the memory
#:
indicator_results: IndicatorResultMap
#: The current decision_cycle() timestamp.
#:
#: Stored here, so we do not need to pass it explicitly in API.
#:
timestamp: pd.Timestamp | None = None
def __post_init__(self):
assert type(self.indicator_results) == dict
assert isinstance(self.available_indicators, IndicatorSet)
assert isinstance(self.strategy_universe, TradingStrategyUniverse)
[docs] def get_price(
self,
pair: TradingPairIdentifier | HumanReadableTradingPairDescription | None = None,
data_lag_tolerance=pd.Timedelta(days=7),
index: int = -1,
timestamp: pd.Timestamp | None = None,
column="close",
) -> USDollarPrice | None:
"""Read the available close price of a trading pair.
- Returns the latest available close price.
- **Does not** return the current price in the decision_cycle,
because any decision must be made based on the previous price
to avoid lookahead bias.
:param pair:
The trading pair for which we query the price.
Give as id object or human description tuple format.
E.g. `(ChainId.centralised_exchange, "binance", "ETH", "USDT")`.
:param data_lag_tolerance:
In the case the data has issues (no recent price),
then accept a price that's this old.
:param index:
Access a specific previous timeframe item.
If not given, always return the previous available value.
Timeframe = candle bar here.
Uses Python list access notation.
- `-1` is the last item (previous time frame value, yesterday).
- `-2` is the item before previous time frame (the day before yesterday).
- `0` is looking to the future (the value at the end of the current day that has not yet passed)
:param timestamp:
Look price at a specific timestamp.
Manually calculate lookback. There is no timeshift for this value,
so unless you are careful you may case lookahead bias.
`index` parameter is ignored.
:param column:
Which column to read from the price series.
E.g. "volume".
:return:
The latest available price.
``None`` if no price information is yet available at this point of time for the strategy.
"""
if timestamp:
shifted_ts = timestamp
else:
assert self.timestamp, f"prepare_decision_cycle() not called - framework missing something somewhere"
ts = self.timestamp
time_frame = self.strategy_universe.data_universe.time_bucket.to_pandas_timedelta()
shifted_ts = ts + time_frame * index
if type(pair) == tuple:
# Resolve human description
pair = self.strategy_universe.get_pair_by_human_description(pair)
if pair is None:
pair = self.strategy_universe.get_single_pair()
assert isinstance(pair, TradingPairIdentifier)
assert pair.internal_id, "pair.internal_id missing - bad unit test data?"
try:
price, when = self.strategy_universe.data_universe.candles.get_price_with_tolerance(
pair.internal_id,
shifted_ts,
tolerance=data_lag_tolerance,
kind=column,
)
return price
except CandleSampleUnavailable:
return None
[docs] def get_tvl(
self,
pair: TradingPairIdentifier | HumanReadableTradingPairDescription | None = None,
data_lag_tolerance=pd.Timedelta(days=7),
index: int = -1,
timestamp: pd.Timestamp | None = None,
) -> USDollarPrice | None:
"""Read the available TVL of a trading pair.
- Returns the latest available TVL/liquidity sample.
- **Does not** return the current liquidity in the decision_cycle,
because any decision must be made based on the previous price
to avoid lookahead bias.
See also :py:meth:`get_price`
:param pair:
The trading pair for which we query the price.
Give as id object or human description tuple format.
E.g. `(ChainId.centralised_exchange, "binance", "ETH", "USDT")`.
:param data_lag_tolerance:
In the case the data has issues (no recent price),
then accept a price that's this old.
:param index:
Access a specific previous timeframe item.
If not given, always return the previous available value.
Timeframe = candle bar here.
Uses Python list access notation.
- `-1` is the last item (previous time frame value, yesterday).
- `-2` is the item before previous time frame (the day before yesterday).
- `0` is looking to the future (the value at the end of the current day that has not yet passed)
:param timestamp:
Look price at a specific timestamp.
Manually calculate lookback. There is no timeshift for this value,
so unless you are careful you may case lookahead bias.
`index` parameter is ignored.
:return:
The latest available TVL.
``None`` if no price information is yet available at this point of time for the strategy.
"""
if timestamp:
shifted_ts = timestamp
else:
assert self.timestamp, f"prepare_decision_cycle() not called - framework missing something somewhere"
ts = self.timestamp
time_frame = self.strategy_universe.data_universe.time_bucket.to_pandas_timedelta()
shifted_ts = ts + time_frame * index
if type(pair) == tuple:
# Resolve human description
pair = self.strategy_universe.get_pair_by_human_description(pair)
if pair is None:
pair = self.strategy_universe.get_single_pair()
assert isinstance(pair, TradingPairIdentifier)
assert pair.internal_id, "pair.internal_id missing - bad unit test data?"
try:
price, when = self.strategy_universe.data_universe.liquidity.get_liquidity_with_tolerance(
pair.internal_id,
shifted_ts,
tolerance=data_lag_tolerance,
)
return price
except LiquidityDataUnavailable:
return None
[docs] def get_indicator_value(
self,
name: str,
column: str | None = None,
pair: TradingPairIdentifier | HumanReadableTradingPairDescription | None = None,
index: int = -1,
clock_shift: pd.Timedelta = pd.Timedelta(hours=0),
data_delay_tolerance: pd.Timedelta="auto",
) -> float | None:
"""Read the available value of an indicator.
- Returns the latest available indicator value.
- **Does not** return the current timestamp value in the decision_cycle,
because any decision must be made based on the previous price.
- Normalises missing inputs, NaNs and other data issues to Python ``None``.
Single pair example with a single series indicator (RSI):
.. code-block:: python
def create_indicators(parameters: StrategyParameters, indicators: IndicatorSet, strategy_universe: TradingStrategyUniverse, execution_context: ExecutionContext):
indicators.add("rsi", pandas_ta.rsi, {"length": parameters.rsi_length})
#
# Then in decide_traces()
#
# Read the RSI value of our only trading pair
indicator_value = input.indicators.get_indicator_value("rsi")
Single pair example with a multi-series indicator (Bollinger band):
.. code-block:: python
def create_indicators(parameters: StrategyParameters, indicators: IndicatorSet, strategy_universe: TradingStrategyUniverse, execution_context: ExecutionContext):
indicators.add("bb", pandas_ta.bbands, {"length": parameters.bb_length})
#
# Then in decide_traces()
#
# Read bollinger band value for the current trading pair.
# Bollinger band look up length was 20 and standard deviation 2.0.
bb_value = input.indicators.get_indicator_value("bb", "BBL_20_2.0")
Example accessing latest and previous values for cross over test:
.. code-block:: python
current_rsi_values[pair] = indicators.get_indicator_value("rsi", pair=pair)
previous_rsi_values[pair] = indicators.get_indicator_value("rsi", index=-2, pair=pair)
# Check for RSI crossing our threshold values in this cycle, compared to the previous cycle
if current_rsi_values[pair] and previous_rsi_values[pair]:
rsi_cross_above = current_rsi_values[pair] >= parameters.rsi_high and previous_rsi_values[btc_pair] < parameters.rsi_high
rsi_cross_below = current_rsi_values[pair] < parameters.rsi_low and previous_rsi_values[pair] > parameters.rsi_low
:param name:
Indicator name as defined in `create_indicators`.
:param column:
The name of the sub-column to read.
For multicolumn indicators like Bollinger Bands,
which produce multiple series of data from one column of price data.
:param pair:
Trading pair.
Must be given if the working with a multipair strategy.
:param index:
Access a specific previous timeframe item.
If not given, always return the previous available value.
Timeframe = candle bar here.
Uses Python list access notation.
- `-1` is the last item (previous time frame value, yesterday).
- `-2` is the item before previous time frame (the day before yesterday).
- `0` is looking to the future (the value at the end of the current day that has not yet passed)
:param clock_shift:
Used in time-shifted backtesting.
:param data_delay_tolerance:
If we do not have an exact timestamp match in the data series, look for the previous value.
Look back max `data_delay_tolerance` days / hours to get a previous value using forward-fill technique.
We need to do this when there is a mismatch between the indicator timeframe (e.g. daily)
and decision cycle / price time frame (e.g. 15 minutes).
Set to `None` to always return indicator value for the exact timestamp match.
Set to `auto to try to figure out mismatch between indicator data and candle data automatically.s
:return:
The latest available indicator value.
Any NaN, NA or not a number value in the indicator data is translated to Python ``None``.
Return ``None`` if value not yet available when asked at the current decision moment.
:raise IndicatorDataNotFoundWithinDataTolerance:
We asked `data_delay_tolerance` look backwards, but there wasn't any samples within the tolerance.
"""
series = self.resolve_indicator_data(name, column, pair)
ts = self.timestamp
time_frame = _calculate_and_cache_candle_width(series.index)
if time_frame is None:
# Bad data.
# E.g. portfolio data with missing values
return None
if data_delay_tolerance == "auto":
ts = ts.floor(time_frame)
data_delay_tolerance = time_frame
shifted_ts = ts + time_frame*index + clock_shift
# First try direct timestamp hit.
# This is the case for any normal strategies,
# where time-series data and decision cycles have the equal indexes
try:
value = series[shifted_ts]
except KeyError:
if shifted_ts > series.index[-1]:
# The data series has ended before the timestamp,
# and there are not going to be new values in the future
return None
# Try to check for uneven timeframes
# E.g. 1d RSI indicator data and 1s decision cycle
#
if data_delay_tolerance is not None:
# TODO: Do we need to cache the indexer... does it has its own storage?
ffill_indexer = series.index.get_indexer([self.timestamp], method="ffill")
before_match_iloc = ffill_indexer[0]
before_match_timestamp = series.index[before_match_iloc]
if before_match_iloc < 0:
# We get -1 if there are no timestamps where the forward fill could start
# This means there are not yet any samples available at the timestamp,
# because the time series will start after the timestamp
return None
# first_sample_timestamp = series.index[0]
#raise IndicatorDataNotFoundWithinDataTolerance(
# f"Could not find any samples for pair {pair}, indicator {name} at {self.timestamp}\n"
# f"- Series has {len(series)} samples\n"
# f"- First sample is at {first_sample_timestamp}\n"
#)
before_match = series.iloc[before_match_iloc]
# Internal sanity check
distance = self.timestamp - before_match_timestamp
assert distance >= ZERO_TIMEDELTA, f"Somehow we managed to get a indicator timestamp {before_match_timestamp} that is newer than asked {self.timestamp}"
if distance > data_delay_tolerance:
raise IndicatorDataNotFoundWithinDataTolerance(
f"Asked indicator {name}. Data delay tolerance is {data_delay_tolerance}, but the delay was longer {distance}.\n"
f"Our timestamp {self.timestamp}, fixed timestamp {shifted_ts}, data available at {before_match_timestamp}.\n"
)
value = before_match
else:
# No match
return None
# The input data was not properly cleaned up and has duplicated values for some dates/times
assert not isinstance(value, pd.Series), "Duplicate DatetimeIndex entries detected for: {name} {column} {pair}"
if pd.isna(value):
return None
return value
[docs] def get_indicator_series(
self,
name: str,
column: str | None = None,
pair: TradingPairIdentifier | HumanReadableTradingPairDescription | None = None,
unlimited=False,
) -> pd.Series | None:
"""Get the whole indicator data series.
By default, return data that is only available before the current timestamp.
:param unlimited:
Get all calculated data, even future one, in backtesting.
:return:
Indicator data.
Data may contain NaN values.
Return ``None`` if any data is not yet available before this stamp.
"""
if not unlimited:
assert self.timestamp is not None, "StrategInputIndicators.timestamp not set for decide_trades(). Call get_indicator_series(unlimited=True) to get all data."
series = self.resolve_indicator_data(name, column, pair, unlimited=unlimited)
if unlimited:
return series
ts = get_prior_timestamp(series, self.timestamp)
if ts is None:
return None
return series.loc[:ts]
[docs] def get_price_series(
self,
column: str = "close",
pair: TradingPairIdentifier | HumanReadableTradingPairDescription | None = None,
) -> pd.Series:
"""Get the whole price series.
- Use for visualisation and other checks
- Not useful inside `decide_trades`, as includes future data
:param column:
Which column to get, default to "close",
:return:
Indicator data.
Data may contain NaN values.
"""
if type(pair) == tuple:
# Resolve human description
pair = self.strategy_universe.get_pair_by_human_description(pair)
if pair is None:
pair = self.strategy_universe.get_single_pair()
assert isinstance(pair, TradingPairIdentifier)
assert pair.internal_id, "pair.internal_id missing - bad unit test data?"
df = self.strategy_universe.data_universe.candles.get_candles_by_pair(
pair.internal_id,
)
return df[column]
[docs] def get_indicator_dataframe(
self,
name: str,
pair: TradingPairIdentifier | HumanReadableTradingPairDescription | None = None
) -> pd.DataFrame:
"""Get the whole raw indicator data for DataFrame-like indicator with multiple columns.
See also :py:meth:`get_indicator_series`
:return:
DataFrame for a multicolumn indicator like Bollinger Bands or ADX
"""
df = self.resolve_indicator_data(name, "all", pair, unlimited=True)
assert isinstance(df, pd.DataFrame), f"Not DataFrame indicator: {name}"
return df
[docs] def resolve_indicator_data(
self,
name: str,
column: str | None = None,
pair: TradingPairIdentifier | HumanReadableTradingPairDescription | None = None,
unlimited=False,
) -> pd.Series | pd.DataFrame:
"""Get access to indicator data series/frame.
Throw friendly error messages for pitfalls.
:param name:
Indicator name
:param column:
Column name for multi-column indicators.
"all" to get the whole DataFrame.
:param pair:
Needed when universe contains multiple trading pairs.
Can be omitted from non-pair indicators.
:param unlimited:
Allow loading of past and future data.
"""
assert type(name) == str
if column is not None:
assert type(column) == str
if not unlimited:
assert self.timestamp, f"StrategyInputIndicators.timestamp is None. prepare_decision_cycle() not called, or you are outside a decide_trades() function."
indicator = self.available_indicators.get_indicator(name)
if indicator is None:
raise IndicatorNotFound(f"Indicator with name '{name}' not defined by create_indicators(). Available indicators are: {self.available_indicators.get_label()}")
if indicator.source.is_per_pair():
if pair is None:
if self.strategy_universe.get_pair_count() != 1:
raise InvalidForMultipairStrategy(f"The strategy universe contains multiple pairs. You need to pass pair argument to the function to determine which trading pair you are manipulating.")
pair = self.strategy_universe.get_single_pair()
if type(pair) == tuple:
# Resolve human description
pair = self.strategy_universe.get_pair_by_human_description(pair)
assert isinstance(pair, TradingPairIdentifier)
assert pair.internal_id, "pair.internal_id missing - bad unit test data?"
key = IndicatorKey(pair, indicator)
else:
# Whole universe/custom indicators
key = IndicatorKey(None, indicator)
indicator_result = self.indicator_results.get(key)
if indicator_result is None:
all_keys = set(self.indicator_results.keys())
all_indicators = set(self.available_indicators.indicators.keys())
raise AssertionError(
f"Indicator results did not contain key {key} for indicator {name}.\n"
f"Available indicators: {all_indicators}\n"
f"Available data series: {all_keys}\n"
)
data = indicator_result.data
assert data is not None, f"Indicator pre-calculated values missing for {name} - lookup key {key}"
if isinstance(data, pd.DataFrame):
if column == "all":
return data
assert column is not None, f"Indicator {name} has multiple available columns to choose from: {data.columns}"
assert column in data.columns, f"Indicator {name} subcolumn {column} not in the available columns: {data.columns}"
series = data[column]
elif isinstance(data, pd.Series):
series = data
else:
raise NotImplementedError(f"Unknown indicator data type {type(data)}")
return series
[docs] def prepare_decision_cycle(self, cycle: int, timestamp: pd.Timestamp):
"""Called for each decision cycle by the framework..
- Instead of making a copy of this data structure each time,
we just bump the timestamp
"""
logger.info("Strategy indicators moved to the cycle: %d: %s", cycle, timestamp)
self.timestamp = timestamp
[docs]@dataclass
class StrategyInput:
"""Inputs for a trading decision.
The data structure used to make trade decisions. Captures
all values that need to go to a single trade, under different live and backtesting
circumstances.
- Inputs for `decide_trades` function
- Enabled when `trading_strategy_engine_version = "0.5"` or higher
"""
#: Strategy cycle number
#:
#: - Deterministic for a backtest
#: - May be reset for live execution
#:
cycle: int
#: Timestamp of this strategy cycle
#:
#: - Timestamp can/should only access earlier data and cannot peek into the future
#: - Always in UTC, no timezone
#:
timestamp: pd.Timestamp
#: The current state of a strategy
#:
#: - You can peek for open/closed positions
#: - Use :py:meth:`get_position_manager` to access
#:
state: State
#: The source trading universe for this strategy run
strategy_universe: TradingStrategyUniverse
#: Parameters used for this backtest or live run
parameters: StrategyParameters
#: All indicators that are precalculated with create_indicators()
#:
#: - Indicators calculated in `create_indicators` function
#: - Cached in backtesting for fast reader
#: - In livee trading recalculated for every cycle
#:
indicators: StrategyInputIndicators
#: Asset pricing model.
#:
#: - Used to determine the position size and value of trades
#: - Backtesting uses historical pricing whereas live trading will read any data directly on-chain
#: - Access using :py:meth:`get_position_manager`
#:
pricing_model: PricingModel
#: Information about whether this is live or backtest run.
#:
execution_context: ExecutionContext
#: Diagnostics and debug data
#:
#: - Undefined format
#: - Mostly used in internal testing and logging
#: - Is mutated in-place, but don't rely on this to work for live strategies
#:
other_data: dict
[docs] def get_position_manager(self) -> PositionManager:
"""Create a position manager instance to open/close trading positions in this decision cycle."""
return PositionManager(
self.timestamp,
self.strategy_universe,
self.state,
self.pricing_model
)
[docs] def get_default_pair(self) -> TradingPairIdentifier:
"""Get the default trading pair for this stragegy.
- Works only for single pair strateiges
:raise InvalidForMultipairStrategy:
If called for a multi pair strategy
"""
if self.strategy_universe.get_pair_count() != 1:
raise InvalidForMultipairStrategy("Strategy universe is multipair - get_default_pair() not available")
return self.strategy_universe.get_single_pair()
[docs] def is_visualisation_enabled(self) -> bool:
"""Should we render any visualisation or not.
- Use this function inside `decide_trades()` to figure out if `state.visualisation` should be filled in
- Disabled for grid seach to optimise grid search speed, as the visualisation results would be likely be discarded
Example:
.. code-block:: python
def decide_trades(input: StrategyInput):
# ...
#
# Visualisations
#
if input.is_visualisation_enabled():
visualisation = state.visualisation # Helper class to visualise strategy output
visualisation.plot_indicator(
timestamp,
f"ETH",
PlotKind.technical_indicator_detached,
current_price[eth_pair],
colour="blue",
)
# Draw BTC + ETH RSI between its trigger zones for this pair of we got a valid value for RSI for this pair
# BTC RSI daily
if pd.notna(current_rsi_values[btc_pair]):
visualisation.plot_indicator(
timestamp,
f"RSI",
PlotKind.technical_indicator_detached,
current_rsi_values[btc_pair],
colour="orange",
)
"""
# Visuals always enabled for live tradin
if self.execution_context.mode.is_live_trading():
return True
# Grid search disables visual plotting to save speed and space
return not self.execution_context.grid_search
_time_frame_cache = cachetools.Cache(maxsize=SERIES_CACHE_SIZE)
def _calculate_and_cache_candle_width(index: pd.DatetimeIndex | pd.MultiIndex) -> pd.Timedelta | None:
"""Get the evenly timestamped index candle/time bar width.
- Cached for speed - cache size might not make sense for large trading pair use cases
:return:
None of the index is empty and candle width cannot be calculated
"""
# The original data is in grouped DF
if isinstance(index, pd.MultiIndex):
# AssertionError: Got index: MultiIndex([(2854997, '2024-04-04 21:00:00'),
# (2854997, '2024-04-04 22:00:00'),
index = index.get_level_values(1)
assert isinstance(index, pd.DatetimeIndex), f"Got index: {index}"
key = id(index)
value = _time_frame_cache.get(key)
if value is None:
if len(index) > 2:
value = index[-1] - index[-2]
else:
value = None
_time_frame_cache[key] = value
return value