Source code for tradeexecutor.strategy.pandas_trader.runner

"""A strategy runner that executes Trading Strategy Pandas type strategies."""

import datetime
from io import StringIO
from typing import List, Optional
import logging

import pandas as pd

from tradeexecutor.cli.discord import post_logging_discord_image
from tradeexecutor.statistics.in_memory_statistics import refresh_live_strategy_images
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSet, calculate_and_load_indicators, MemoryIndicatorStorage, call_create_indicators
from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInput, StrategyInputIndicators
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.pricing_model import PricingModel
from tradeexecutor.strategy.strategy_module import DecideTradesProtocol, DecideTradesProtocol2, DecideTradesProtocol3, DecideTradesProtocol4
from tradeexecutor.strategy.sync_model import SyncModel
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse, translate_trading_pair

from tradeexecutor.state.state import State
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.strategy.runner import StrategyRunner, PreflightCheckFailed
from tradeexecutor.visual.image_output import render_plotly_figure_as_image_file
from tradeexecutor.visual.strategy_state import draw_single_pair_strategy_state, draw_multi_pair_strategy_state


logger = logging.getLogger(__name__)


[docs]class PandasTraderRunner(StrategyRunner): """A trading executor for Pandas math based algorithm."""
[docs] def __init__( self, *args, decide_trades: DecideTradesProtocol | DecideTradesProtocol2 | DecideTradesProtocol3 | DecideTradesProtocol4, max_data_age: datetime.timedelta = None, **kwargs ): super().__init__(*args, **kwargs) self.decide_trades = decide_trades self.max_data_age = max_data_age # Legacy assets sync_model = kwargs.get("sync_model") if sync_model is not None: assert isinstance(sync_model, SyncModel)
def on_data_signal(self): pass
[docs] def on_clock( self, clock: datetime.datetime, strategy_universe: TradingStrategyUniverse, pricing_model: PricingModel, state: State, debug_details: dict, indicators:StrategyInputIndicators | None = None, ) -> List[TradeExecution]: """Run one strategy tick.""" assert isinstance(strategy_universe, TradingStrategyUniverse) universe = strategy_universe.data_universe pd_timestamp = pd.Timestamp(clock) assert state.sync.treasury.last_updated_at is not None, "Cannot do trades before treasury is synced at least once" # All sync models do not emit events correctly yet # assert len(state.sync.treasury.balance_update_refs) > 0, "No deposit detected. Please do at least one deposit before starting the strategy" assert len(strategy_universe.reserve_assets) == 1 # Call the strategy script decide_trades() # callback if self.execution_context.is_version_greater_or_equal_than(0, 5, 0): # DecideTradesProtocolV4 if self.execution_context.mode.is_live_trading(): # Indicators are recalculated for every tick in the live trading assert self.create_indicators is not None, "trading_strategy_engine_version > 0.5, but we lack create_indicators" assert self.parameters is not None, "trading_strategy_engine_version > 0.5, but we lack parameters" indicators = self.calculate_live_indicators(clock, strategy_universe, self.parameters) assert indicators is not None, "indicators not created when running trading_strategy_engine_version=0.5" indicators.prepare_decision_cycle(debug_details["cycle"], pd_timestamp) input = StrategyInput( cycle=debug_details["cycle"], timestamp=pd_timestamp, strategy_universe=strategy_universe, state=state, pricing_model=pricing_model, other_data=debug_details, indicators=indicators, parameters=self.parameters, execution_context=self.execution_context, ) return self.decide_trades( input ) elif self.execution_context.is_version_greater_or_equal_than(0, 4, 0): parameters = self.execution_context.parameters # DecideTradesProtocolV3 parameters["cycle"] = debug_details["cycle"] return self.decide_trades( timestamp=pd_timestamp, parameters=parameters, strategy_universe=strategy_universe, state=state, pricing_model=pricing_model, ) elif self.execution_context.is_version_greater_or_equal_than(0, 3, 0): return self.decide_trades( timestamp=pd_timestamp, strategy_universe=strategy_universe, state=state, pricing_model=pricing_model, cycle_debug_data=debug_details, ) else: return self.decide_trades( timestamp=pd_timestamp, universe=universe, state=state, pricing_model=pricing_model, cycle_debug_data=debug_details, )
[docs] def pretick_check(self, ts: datetime.datetime, universe: TradingStrategyUniverse): """Check the data looks more or less sane.""" assert isinstance(universe, TradingStrategyUniverse) universe = universe.data_universe now_ = ts if len(universe.exchanges) == 0: raise PreflightCheckFailed("Exchange count zero") if universe.pairs.get_count() == 0: raise PreflightCheckFailed("Pair count zero") # Don't assume we have candle or liquidity data e.g. for the testing strategies if universe.candles is not None: if universe.candles.get_candle_count() > 0: start, end = universe.candles.get_timestamp_range() if self.max_data_age is not None: if now_ - end > self.max_data_age: raise PreflightCheckFailed(f"We do not have up-to-date data for candles. Last candles are at {end}")
[docs] def refresh_visualisations(self, state: State, universe: TradingStrategyUniverse): """Updates the visualisation images for the strategy. - Used in Discord (small) - Used on the frontend (large) This is automatically called on trade-executor console startup: docker compose run enzyme-polygon-eth-btc-usdc console To call this manually from the same console with pre-set up runner .. code-block:: shell runner.refresh_visualisations(state, strategy_universe) """ if not self.run_state: # This strategy is not maintaining a run-state # Backtest, simulation, etc. logger.info("Could not update strategy thinking image data, self.run_state not available") return pair_count = universe.get_pair_count() logger.info("Refreshing strategy visualisations: %s, pair count is %d", self.run_state.visualisation, pair_count ) execution_context = self.execution_context if universe.is_empty(): # TODO: Not sure how we end up here logger.info("Strategy universe is empty - nothing to report") return try: if pair_count == 1: small_figure = draw_single_pair_strategy_state(state, execution_context, universe, height=512) # Draw the inline plot and expose them tot he web server # TODO: SVGs here are not very readable, have them as a stop gap solution large_figure = draw_single_pair_strategy_state(state, execution_context, universe, height=1024) self.update_strategy_thinking_image_data(small_figure, large_figure) elif 1 < pair_count <= 3: small_figure_combined = draw_multi_pair_strategy_state(state, execution_context, universe, height=1024) large_figure_combined = draw_multi_pair_strategy_state(state, execution_context, universe, height=2048) self.update_strategy_thinking_image_data(small_figure_combined, large_figure_combined) elif 3 < pair_count <=5: small_figure_combined = draw_multi_pair_strategy_state(state, execution_context, universe, height=2048, detached_indicators = False) large_figure_combined = draw_multi_pair_strategy_state(state, execution_context, universe, height=3840, width = 2160, detached_indicators = False) self.update_strategy_thinking_image_data(small_figure_combined, large_figure_combined) else: logger.warning("Charts not yet available for this strategy type. Pair count: %d", pair_count) except Exception as e: # Don't take trade executor down if visualisations fail logger.warning("Could not draw visualisations in refresh_visualisations()") logger.warning("Visualisation exception %s", e, exc_info=e)
[docs] def update_strategy_thinking_image_data(self, small_figure, large_figure): """Update the strategy thinking image data with small, small dark theme, large, and large dark theme images. :param small_image: 512 x 512 image :param large_image: 1920 x 1920 image """ execution_context = self.execution_context refresh_live_strategy_images(self.run_state, execution_context, small_figure, large_figure)
[docs] def report_strategy_thinking( self, strategy_cycle_timestamp: datetime.datetime, cycle: int, universe: TradingStrategyUniverse, state: State, trades: List[TradeExecution], debug_details: dict ): """Strategy admin helpers to understand a live running strategy. - Post latest variables - Draw the single pair strategy visualisation. To manually test the visualisation see: `manual-visualisation-test.py`. :param strategy_cycle_timestamp: real time lock :param cycle: Cycle number :param universe: Currnet trading universe :param trades: Trades executed on this cycle :param state: Current execution state :param debug_details: Dict of random debug stuff """ # Update charts self.refresh_visualisations(state, universe) visualisation = state.visualisation if universe.is_empty(): # TODO: Not sure how we end up here logger.info("Strategy universe is empty - nothing to report") return if universe.is_single_pair_universe(): # Log state buf = StringIO() pair = universe.get_single_pair() candles = universe.data_universe.candles.get_candles_by_pair(pair.internal_id) last_candle = candles.iloc[-1] lag = pd.Timestamp.utcnow().tz_localize(None) - last_candle["timestamp"] print("Strategy thinking", file=buf) print("", file=buf) print(f" Strategy cycle #{cycle}: {strategy_cycle_timestamp} UTC, now is {datetime.datetime.utcnow()}", file=buf) print(f" Last candle at: {last_candle['timestamp']} UTC, market data and action lag: {lag}", file=buf) print(f" Price open:{last_candle['open']} close:{last_candle['close']} {pair.base.token_symbol} / {pair.quote.token_symbol}", file=buf) # Draw indicators for name, plot in visualisation.plots.items(): value = plot.get_last_value() print(f" {name}: {value}", file=buf) logger.trade(buf.getvalue()) small_image = self.run_state.visualisation.small_image_png if small_image is not None: post_logging_discord_image(small_image) else: logger.warning("Chart visualisation missing") else: # Log state buf = StringIO() print("Strategy thinking", file=buf) print(f" Strategy cycle #{cycle}: {strategy_cycle_timestamp} UTC, now is {datetime.datetime.utcnow()}", file=buf) for pair_id, candles in universe.data_universe.candles.get_all_pairs(): pair = universe.data_universe.pairs.get_pair_by_id(pair_id) pair_slug = f"{pair.base_token_symbol} / {pair.quote_token_symbol}" print(f"\n {pair_slug}", file=buf) lag = None timestamp = None last_candle = None if len(candles) > 0: last_candle = candles.iloc[-1] try: timestamp = last_candle["timestamp"] lag = pd.Timestamp.utcnow().tz_localize(None) - timestamp except: logger.warning("Cannot read timestamp") else: logger.warning("Pair %s had not candle data", pair) dex_pair = universe.data_universe.pairs.get_pair_by_id(pair_id) pair = translate_trading_pair(dex_pair) if not pair: logger.warning(f" Pair missing: {dex_pair} - should not happen") else: print(f" Last candle at: {timestamp} UTC, market data and action lag: {lag}", file=buf) if last_candle is not None: print(f" Price open:{last_candle['open']}", file=buf) print(f" Close:{last_candle['close']}") # Draw indicators for name, plot in visualisation.plots.items(): if getattr(plot.pair, "internal_id", None) is None: logger.warning(f" Plot {name} has no pair argument. To see indicator values for individual pairs in a multipair strategy, add pair argument to the `plot_indicator` function in your strategy file.") continue if plot.pair.internal_id != pair_id: continue value = plot.get_last_value() print(f" {name}: {value}", file=buf) logger.trade(buf.getvalue()) # there is already a warning in refresh_visualisations for pair count > 3 if universe.get_pair_count() <= 5: large_image = self.run_state.visualisation.large_image_png post_logging_discord_image(large_image) else: logger.info(f"Strategy visualisation not posted to Discord because pair count of {universe.get_pair_count()} is greater than 5.")
[docs] def calculate_live_indicators( self, timestamp: datetime.datetime, strategy_universe: TradingStrategyUniverse, parameters: StrategyParameters, ) -> StrategyInputIndicators: """Calculate and recalculate indicators in a live trading. - Calculated just before `decide_trades` is called - Recalculated for every cycle :return: Freshly calculated indicators """ # storage = self.indicator_storage logger.info("Calculating live indicators for %s", timestamp) storage = MemoryIndicatorStorage(strategy_universe.get_cache_key()) indicators = call_create_indicators( self.create_indicators, parameters, strategy_universe, self.execution_context, timestamp, ) indicator_results = calculate_and_load_indicators( strategy_universe=strategy_universe, storage=storage, execution_context=self.execution_context, indicators=indicators, parameters=self.parameters, timestamp=timestamp, ) strategy_input_indicators = StrategyInputIndicators( strategy_universe, indicator_results=indicator_results, available_indicators=indicators, ) return strategy_input_indicators