"""A strategy runner that executes Trading Strategy Pandas type strategies."""
import datetime
from io import StringIO
from typing import List, Optional
import logging
import pandas as pd
from tradeexecutor.cli.discord import post_logging_discord_image
from tradeexecutor.strategy.pricing_model import PricingModel
from tradeexecutor.strategy.strategy_module import DecideTradesProtocol, DecideTradesProtocol2
from tradeexecutor.strategy.sync_model import SyncModel
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse, translate_trading_pair
from tradeexecutor.state.state import State
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.strategy.runner import StrategyRunner, PreflightCheckFailed
from tradeexecutor.visual.image_output import render_plotly_figure_as_image_file
from tradeexecutor.visual.strategy_state import draw_single_pair_strategy_state, draw_multi_pair_strategy_state
from tradeexecutor.state.visualisation import Visualisation
logger = logging.getLogger(__name__)
[docs]class PandasTraderRunner(StrategyRunner):
"""A trading executor for Pandas math based algorithm."""
[docs] def __init__(self,
*args,
decide_trades: DecideTradesProtocol | DecideTradesProtocol2,
max_data_age: Optional[datetime.timedelta] = None,
**kwargs):
super().__init__(*args, **kwargs)
self.decide_trades = decide_trades
self.max_data_age = max_data_age
# Legacy assets
sync_model = kwargs.get("sync_model")
if sync_model is not None:
assert isinstance(sync_model, SyncModel)
def on_data_signal(self):
pass
[docs] def on_clock(self,
clock: datetime.datetime,
strategy_universe: TradingStrategyUniverse,
pricing_model: PricingModel,
state: State,
debug_details: dict) -> List[TradeExecution]:
"""Run one strategy tick."""
assert isinstance(strategy_universe, TradingStrategyUniverse)
universe = strategy_universe.universe
pd_timestamp = pd.Timestamp(clock)
assert state.sync.treasury.last_updated_at is not None, "Cannot do trades before treasury is synced at least once"
# All sync models do not emit events correctly yet
# assert len(state.sync.treasury.balance_update_refs) > 0, "No deposit detected. Please do at least one deposit before starting the strategy"
assert len(strategy_universe.reserve_assets) == 1
# Call the strategy script decide_trades()
# callback
if self.execution_context.is_version_greater_or_equal_than(0, 3, 0):
return self.decide_trades(
timestamp=pd_timestamp,
strategy_universe=strategy_universe,
state=state,
pricing_model=pricing_model,
cycle_debug_data=debug_details,
)
else:
return self.decide_trades(
timestamp=pd_timestamp,
universe=universe,
state=state,
pricing_model=pricing_model,
cycle_debug_data=debug_details,
)
[docs] def pretick_check(self, ts: datetime.datetime, universe: TradingStrategyUniverse):
"""Check the data looks more or less sane."""
assert isinstance(universe, TradingStrategyUniverse)
universe = universe.universe
now_ = ts
if len(universe.exchanges) == 0:
raise PreflightCheckFailed("Exchange count zero")
if universe.pairs.get_count() == 0:
raise PreflightCheckFailed("Pair count zero")
# Don't assume we have candle or liquidity data e.g. for the testing strategies
if universe.candles is not None:
if universe.candles.get_candle_count() > 0:
start, end = universe.get_candle_availability()
if self.max_data_age is not None:
if now_ - end > self.max_data_age:
raise PreflightCheckFailed(f"We do not have up-to-date data for candles. Last candles are at {end}")
[docs] def refresh_visualisations(self, state: State, universe: TradingStrategyUniverse):
if not self.run_state:
# This strategy is not maintaining a run-state
# Backtest, simulation, etc.
logger.info("Could not update strategy thinking image data, self.run_state not available")
return
logger.info("Refreshing strategy visualisations: %s", self.run_state.visualisation)
if universe.is_empty():
# TODO: Not sure how we end up here
logger.info("Strategy universe is empty - nothing to report")
return
if universe.is_single_pair_universe():
small_figure = draw_single_pair_strategy_state(state, universe, height=512)
# Draw the inline plot and expose them tot he web server
# TODO: SVGs here are not very readable, have them as a stop gap solution
large_figure = draw_single_pair_strategy_state(state, universe, height=1024)
self.update_strategy_thinking_image_data(small_figure, large_figure)
elif 1 < universe.get_pair_count() <= 3:
small_figure_combined = draw_multi_pair_strategy_state(state, universe, height=1024)
large_figure_combined = draw_multi_pair_strategy_state(state, universe, height=2048)
self.update_strategy_thinking_image_data(small_figure_combined, large_figure_combined)
elif 3 < universe.get_pair_count() <=5 :
small_figure_combined = draw_multi_pair_strategy_state(state, universe, height=2048, detached_indicators = False)
large_figure_combined = draw_multi_pair_strategy_state(state, universe, height=3840, width = 2160, detached_indicators = False)
self.update_strategy_thinking_image_data(small_figure_combined, large_figure_combined)
else:
logger.warning("Charts not yet available for this strategy type. Pair count: %s", universe.get_pair_count())
[docs] def update_strategy_thinking_image_data(self, small_figure, large_figure):
"""Update the strategy thinking image data with small, small dark theme, large, and large dark theme images.
:param small_image: 512 x 512 image
:param large_image: 1920 x 1920 image
"""
small_image, small_image_dark = self.get_small_images(small_figure)
large_image, large_image_dark = self.get_large_images(large_figure)
# uncomment if you want light mode for Discord
# small_figure.update_layout(template="plotly")
# large_figure.update_layout(template="plotly")
# don't need the dark images for png (only post light images to discord)
small_image_png, _ = self.get_image_and_dark_image(small_figure, format="png", width=512, height=512)
large_image_png, _ = self.get_image_and_dark_image(large_figure, format="png", width=1024, height=1024)
self.run_state.visualisation.update_image_data(
small_image,
large_image,
small_image_dark,
large_image_dark,
small_image_png,
large_image_png,
)
[docs] def get_small_images(self, small_figure):
"""Gets the png image of the figure and the dark theme png image. Images are 512 x 512."""
return self.get_image_and_dark_image(small_figure, width=512, height=512)
[docs] def get_large_images(self, large_figure):
"""Gets the png image of the figure and the dark theme png image. Images are 1024 x 1024."""
return self.get_image_and_dark_image(large_figure, width=1024, height=1024)
[docs] def get_image_and_dark_image(self, figure, width, height, format="svg"):
"""Renders the figure as a PNG image and a dark theme PNG image."""
image = render_plotly_figure_as_image_file(figure, width=width, height=height, format=format)
figure.update_layout(template="plotly_dark")
image_dark = render_plotly_figure_as_image_file(figure, width=width, height=height, format=format)
return image, image_dark
[docs] def report_strategy_thinking(self,
strategy_cycle_timestamp: datetime.datetime,
cycle: int,
universe: TradingStrategyUniverse,
state: State,
trades: List[TradeExecution],
debug_details: dict):
"""Strategy admin helpers to understand a live running strategy.
- Post latest variables
- Draw the single pair strategy visualisation.
To manually test the visualisation see: `manual-visualisation-test.py`.
:param strategy_cycle_timestamp:
real time lock
:param cycle:
Cycle number
:param universe:
Currnet trading universe
:param trades:
Trades executed on this cycle
:param state:
Current execution state
:param debug_details:
Dict of random debug stuff
"""
# Update charts
self.refresh_visualisations(state, universe)
visualisation = state.visualisation
if universe.is_empty():
# TODO: Not sure how we end up here
logger.info("Strategy universe is empty - nothing to report")
return
if universe.is_single_pair_universe():
# Log state
buf = StringIO()
pair = universe.get_single_pair()
candles = universe.universe.candles.get_candles_by_pair(pair.internal_id)
last_candle = candles.iloc[-1]
lag = pd.Timestamp.utcnow().tz_localize(None) - last_candle["timestamp"]
print("Strategy thinking", file=buf)
print("", file=buf)
print(f" Strategy cycle #{cycle}: {strategy_cycle_timestamp} UTC, now is {datetime.datetime.utcnow()}", file=buf)
print(f" Last candle at: {last_candle['timestamp']} UTC, market data and action lag: {lag}", file=buf)
print(f" Price open:{last_candle['open']} close:{last_candle['close']} {pair.base.token_symbol} / {pair.quote.token_symbol}", file=buf)
# Draw indicators
for name, plot in visualisation.plots.items():
value = plot.get_last_value()
print(f" {name}: {value}", file=buf)
logger.trade(buf.getvalue())
small_image = self.run_state.visualisation.small_image_png
post_logging_discord_image(small_image)
else:
# Log state
buf = StringIO()
print("Strategy thinking", file=buf)
print(f" Strategy cycle #{cycle}: {strategy_cycle_timestamp} UTC, now is {datetime.datetime.utcnow()}", file=buf)
for pair_id, candles in universe.universe.candles.get_all_pairs():
pair = universe.universe.pairs.get_pair_by_id(pair_id)
pair_slug = f"{pair.base_token_symbol} / {pair.quote_token_symbol}"
print(f"\n {pair_slug}", file=buf)
last_candle = candles.iloc[-1]
lag = pd.Timestamp.utcnow().tz_localize(None) - last_candle["timestamp"]
dex_pair = universe.universe.pairs.get_pair_by_id(pair_id)
pair = translate_trading_pair(dex_pair)
if not pair:
logger.warning(f" Pair missing: {dex_pair} - should not happen")
else:
print(f" Last candle at: {last_candle['timestamp']} UTC, market data and action lag: {lag}", file=buf)
print(f" Price open:{last_candle['open']}", file=buf)
print(f" Close:{last_candle['close']}")
# Draw indicators
for name, plot in visualisation.plots.items():
if getattr(plot.pair, "internal_id", None) is None:
logger.warning(f" Plot {name} has no pair argument. To see indicator values for individual pairs in a multipair strategy, add pair argument to the `plot_indicator` function in your strategy file.")
continue
if plot.pair.internal_id != pair_id:
continue
value = plot.get_last_value()
print(f" {name}: {value}", file=buf)
logger.trade(buf.getvalue())
# there is already a warning in refresh_visualisations for pair count > 3
if universe.get_pair_count() <= 5:
large_image = self.run_state.visualisation.large_image_png
post_logging_discord_image(large_image)
else:
logger.info(f"Strategy visualisation not posted to Discord because pair count of {universe.get_pair_count()} is greater than 5.")