Source code for tradeexecutor.analysis.trade_analyser

"""Analyze the trade performance of algorithm.

Calculate success/fail rate of trades and plot success distribution.

Example analysis include:

- Table: Summary of all trades

- Graph: Trade won/lost distribution

- Timeline: Analysis of each individual trades made

.. note ::

    A lot of this code has been lifted off from trading-strategy package
    where it had to deal with different trading frameworks.
    It could be simplified greatly now.

import datetime
import enum
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Iterable, Optional, Tuple, Callable, Set

import numpy as np
import pandas as pd
from IPython.core.display_functions import display
from dataclasses_json import dataclass_json, config
from statistics import median

from tradeexecutor.state.position import TradingPosition
from tradeexecutor.state.portfolio import Portfolio
from import TradeExecution, TradeType
from tradeexecutor.state.types import USDollarPrice
from tradeexecutor.utils.format import calculate_percentage
from tradeexecutor.utils.timestamp import json_encode_timedelta, json_decode_timedelta
from tradingstrategy.timebucket import TimeBucket

from import Exchange
from tradingstrategy.pair import PandasPairUniverse
from tradingstrategy.types import PrimaryKey, USDollarAmount
from tradingstrategy.utils.format import format_value, format_price, format_duration_days_hours_mins, \
from tradingstrategy.utils.summarydataframe import as_dollar, as_integer, create_summary_table, as_percent, as_duration, as_bars

logger = logging.getLogger(__name__)

[docs]@dataclass class SpotTrade: """Track spot trades to construct position performance. For sells, quantity is negative. """ #: Internal running counter to uniquely label all trades in trade analysis trade_id: PrimaryKey #: Trading pair for this trade pair_id: PrimaryKey #: When this trade was made, the backtes simulation thick timestamp: pd.Timestamp #: Asset mid-price price: USDollarPrice executed_price: USDollarPrice #: How much we bought the asset. Negative value for sells. quantity: float #: How much fees we paid to the exchange commission: USDollarAmount #: How much we lost against the midprice due to the slippage slippage: USDollarAmount #: Any hints applied for this trade why it was performed trade_type: Optional[TradeType] = None #: Internal state dump of the algorithm when this trade was made. #: This is mostly useful when doing the trade analysis try to understand #: why some trades were made. #: It also allows you to reconstruct the portfolio state over the time. state_details: Optional[Dict] = None #: LP fees paid, currency convereted to the USD. #: #: The value is read back from the realised trade. #: LP fee is usually % of the trade. For Uniswap style exchanges #: fees are always taken from `amount in` token #: and directly passed to the LPs as the part of the swap, #: these is no separate fee information. lp_fees_paid: Optional[USDollarAmount] = None def is_buy(self): return self.quantity > 0 def is_sell(self): return self.quantity < 0 @property def value(self) -> USDollarAmount: return abs(self.executed_price * float(self.quantity))
[docs]@dataclass class TradePosition: """How a particular asset traded. Each asset can have multiple entries (buys) and exits (sells) For a simple strategies there can be only one or two trades per position. * Enter (buy) * Exit (sell optionally) """ #: Position id of the trade #: Used to be self.trades[0].trade_id position_id: int #: List of all trades done for this position trades: List[SpotTrade] = field(default_factory=list) #: Closing the position could be deducted from the trades themselves, #: but we cache it by hand to speed up processing opened_at: Optional[pd.Timestamp] = None #: Closing the position could be deducted from the trades themselves, #: but we cache it by hand to speed up processing closed_at: Optional[pd.Timestamp] = None def __eq__(self, other: "TradePosition"): """Trade positions are unique by opening timestamp and pair id.] We assume there cannot be a position opened for the same asset at the same time twice. """ return self.position_id == other.position_id def __hash__(self): """Allows easily create index (hash map) of all positions""" return hash((self.position_id)) @property def pair_id(self) -> PrimaryKey: """Position id is the same as the opening trade id.""" return self.trades[0].pair_id @property def duration(self) -> Optional[datetime.timedelta]: """How long this position was held. :return: None if the position is still open """ if not self.is_closed(): return None return self.closed_at - self.opened_at def is_open(self): return self.closed_at is None def is_closed(self): return not self.is_open() @property def open_quantity(self) -> float: return sum([t.quantity for t in self.trades]) @property def open_value(self) -> float: """The current value of this open position, with the price at the time of opening.""" assert self.is_open() return sum([t.value for t in self.trades]) @property def open_price(self) -> float: """At what price we opened this position. Supports only simple enter/exit positions. """ return self.get_first_entry_price()
[docs] def get_first_entry_price(self) -> float: """What was the price when the first entry buy for this position was made. """ buys = list(self.buys) return buys[0].price
[docs] def get_last_exit_price(self) -> float: """What was the time when the last sell for this position was executd. """ sells = list(self.sells) return sells[-1].price
@property def close_price(self) -> float: """At what price we exited this position. Supports only simple enter/exit positions. """ return self.get_last_exit_price() @property def buys(self) -> Iterable[SpotTrade]: return [t for t in self.trades if t.is_buy()] @property def sells(self) -> Iterable[SpotTrade]: return [t for t in self.trades if t.is_sell()] @property def buy_value(self) -> USDollarAmount: return sum([t.value - t.commission for t in self.trades if t.is_buy()]) @property def sell_value(self) -> USDollarAmount: return sum([t.value - t.commission for t in self.trades if t.is_sell()]) @property def realised_profit(self) -> USDollarAmount: """Calculated life-time profit over this position.""" assert not self.is_open() return -sum([float(t.quantity) * t.executed_price - t.commission for t in self.trades]) @property def realised_profit_percent(self) -> float: """Calculated life-time profit over this position.""" assert not self.is_open() buy_value = self.buy_value sell_value = self.sell_value return sell_value / buy_value - 1
[docs] def is_win(self): """Did we win this trade.""" assert not self.is_open() return self.realised_profit > 0
def is_lose(self): assert not self.is_open() return self.realised_profit < 0
[docs] def is_stop_loss(self) -> bool: """Was stop loss triggered for this position""" for t in self.trades: if t.trade_type == TradeType.stop_loss: return True return False
[docs] def is_take_profit(self) -> bool: """Was trake profit triggered for this position""" for t in self.trades: if t.trade_type == TradeType.take_profit: return True return False
def add_trade(self, t: SpotTrade): if self.trades: last_trade = self.trades[-1] assert t.timestamp >= last_trade.timestamp, f"Tried to do trades in wrong order. Last: {last_trade}, got {t}" self.trades.append(t) def can_trade_close_position(self, t: SpotTrade): assert self.is_open() if not t.is_sell(): return False open_quantity = self.open_quantity closing_quantity = -t.quantity assert closing_quantity <= open_quantity, "Cannot sell more than we have in balance sheet" return closing_quantity == open_quantity
[docs] def get_max_size(self) -> USDollarAmount: """Get the largest size of this position over the time""" cur_size = 0 max_size = 0 if len(self.trades) > 2: logger.warning("Position has %d trades so this method might produce wrong result") for t in self.trades: cur_size = t.value max_size = max(cur_size, max_size) return max_size
[docs] def get_trade_count(self) -> int: """How many individual trades was done to manage this position.""" return len(self.trades)
[docs] def get_total_lp_fees_paid(self) -> int: """Get the total amount of swap fees paid in the position. Includes all trades.""" return sum(trade.lp_fees_paid for trade in self.trades if trade.lp_fees_paid is not None)
[docs]@dataclass class AssetTradeHistory: """How a particular asset traded. Each position can have increments or decrements. When position is decreased to zero, it is considered closed, and a new buy open a new position. """ positions: List[TradePosition] = field(default_factory=list) def get_first_opened_at(self) -> Optional[pd.Timestamp]: if self.positions: return self.positions[0].opened_at return None def get_last_closed_at(self) -> Optional[pd.Timestamp]: for position in reversed(self.positions): if not position.is_open(): return position.closed_at return None
[docs] def add_trade(self, t: SpotTrade, position_id: Optional[int]=None): """Adds a new trade to the asset history. If there is an open position the trade is added against this, otherwise a new position is opened for tracking. """ current_position = None if self.positions and self.positions[-1].is_open(): current_position = self.positions[-1] if current_position: # For live trading if current_position.can_trade_close_position(t): # Close the existing position current_position.closed_at = t.timestamp current_position.add_trade(t) assert current_position.open_quantity == 0 else: # Add to the existing position current_position.add_trade(t) else: # For backtesting # Open new position assert position_id is not None, "position id must be provided when opening a new position for backtesting" new_position = TradePosition(opened_at=t.timestamp, position_id=position_id) new_position.add_trade(t) self.positions.append(new_position)
[docs]@dataclass_json @dataclass(slots=True) class TradeSummary: """Some generic statistics over all the trades""" won: int lost: int zero_loss: int stop_losses: int undecided: int realised_profit: USDollarAmount open_value: USDollarAmount uninvested_cash: USDollarAmount initial_cash: USDollarAmount extra_return: USDollarAmount duration: datetime.timedelta = field(metadata=config( encoder=json_encode_timedelta, decoder=json_decode_timedelta, )) average_winning_trade_profit_pc: float average_losing_trade_loss_pc: float biggest_winning_trade_pc: Optional[float] biggest_losing_trade_pc: Optional[float] average_duration_of_winning_trades: datetime.timedelta = field(metadata=config( encoder=json_encode_timedelta, decoder=json_decode_timedelta, )) average_duration_of_losing_trades: datetime.timedelta = field(metadata=config( encoder=json_encode_timedelta, decoder=json_decode_timedelta, )) time_bucket: Optional[TimeBucket] = None total_trades: int = field(init=False) win_percent: float = field(init=False) return_percent: float = field(init=False) annualised_return_percent: float = field(init=False) all_stop_loss_percent: float = field(init=False) lost_stop_loss_percent: float = field(init=False) average_net_profit: USDollarAmount = field(init=False) end_value: USDollarAmount = field(init=False) # used if raw_timeline is provided as argument to calculate_summary_statistics average_trade: Optional[float] = None median_trade: Optional[float] = None max_pos_cons: Optional[int] = None max_neg_cons: Optional[int] = None max_pullback: Optional[float] = None max_loss_risk: Optional[float] = None max_realised_loss: Optional[float] = None avg_realised_risk: Optional[float] = None def __post_init__(self): self.total_trades = self.won + self.lost + self.zero_loss self.win_percent = calculate_percentage(self.won, self.total_trades) self.return_percent = calculate_percentage(self.realised_profit, self.initial_cash) self.annualised_return_percent = calculate_percentage(self.return_percent * datetime.timedelta(days=365), self.duration) if self.return_percent else None self.all_stop_loss_percent = calculate_percentage(self.stop_losses, self.total_trades) self.lost_stop_loss_percent = calculate_percentage(self.stop_losses, self.lost) self.average_net_profit = self.realised_profit / self.total_trades if self.total_trades else None self.end_value = self.open_value + self.uninvested_cash
[docs] def to_dataframe(self) -> pd.DataFrame: """Convert the data to a human readable summary table. """ if(self.time_bucket is not None): avg_duration_winning = as_bars(self.average_duration_of_winning_trades) avg_duration_losing = as_bars(self.average_duration_of_losing_trades) else: avg_duration_winning = as_duration(self.average_duration_of_winning_trades) avg_duration_losing = as_duration(self.average_duration_of_losing_trades) """Creates a human-readable Pandas dataframe table from the object.""" human_data = { "Trading period length": as_duration(self.duration), "Return %": as_percent(self.return_percent), "Annualised return %": as_percent(self.annualised_return_percent), "Cash at start": as_dollar(self.initial_cash), "Value at end": as_dollar(self.end_value), "Trade win percent": as_percent(self.win_percent), "Total trades done": as_integer(self.total_trades), "Won trades": as_integer(self.won), "Lost trades": as_integer(self.lost), "Stop losses triggered": as_integer(self.stop_losses), "Stop loss % of all": as_percent(self.all_stop_loss_percent), "Stop loss % of lost": as_percent(self.lost_stop_loss_percent), "Zero profit trades": as_integer(self.zero_loss), "Positions open at the end": as_integer(self.undecided), "Realised profit and loss": as_dollar(self.realised_profit), "Portfolio unrealised value": as_dollar(self.open_value), "Extra returns on lending pool interest": as_dollar(self.extra_return), "Cash left at the end": as_dollar(self.uninvested_cash), "Average winning trade profit %": as_percent(self.average_winning_trade_profit_pc), "Average losing trade loss %": as_percent(self.average_losing_trade_loss_pc), "Biggest winning trade %": as_percent(self.biggest_winning_trade_pc), "Biggest losing trade %": as_percent(self.biggest_losing_trade_pc), "Average duration of winning trades": avg_duration_winning, "Average duration of losing trades": avg_duration_losing, } def add_prop(value, key: str, formatter: Callable): human_data[key] = ( formatter(value) if value is not None else formatter(0) ) add_prop(self.average_trade, 'Average trade:', as_percent) add_prop(self.median_trade, 'Median trade:', as_percent) add_prop(self.max_pos_cons, 'Consecutive wins', as_integer) add_prop(self.max_neg_cons, 'Consecutive losses', as_integer) add_prop(self.max_realised_loss, 'Biggest realized risk', as_percent) add_prop(self.avg_realised_risk, 'Avg realised risk', as_percent) add_prop(self.max_pullback, 'Max pullback of total capital', as_percent) add_prop(self.max_loss_risk, 'Max loss risk at opening of position', as_percent) df = create_summary_table(human_data) return df
[docs] def show(self): """Render a summary table in IPython notebook.""" with pd.option_context("display.max_row", None): df = self.to_dataframe() display([{'selector': 'thead', 'props': [('display', 'none')]}]))
[docs]@dataclass class TradeAnalysis: """Analysis of trades in a portfolio.""" portfolio: Portfolio #: How a particular asset traded. Asset id -> Asset history mapping asset_histories: Dict[object, AssetTradeHistory] = field(default_factory=dict) def get_first_opened_at(self) -> Optional[pd.Timestamp]: def all_opens(): for history in self.asset_histories.values(): yield history.get_first_opened_at() return min(all_opens()) def get_last_closed_at(self) -> Optional[pd.Timestamp]: def all_closes(): for history in self.asset_histories.values(): closed = history.get_last_closed_at() if closed: yield closed return max(all_closes())
[docs] def get_all_positions(self) -> Iterable[Tuple[PrimaryKey, TradePosition]]: """Return open and closed positions over all traded assets.""" for pair_id, history in self.asset_histories.items(): for position in history.positions: yield pair_id, position
[docs] def get_open_positions(self) -> Iterable[Tuple[PrimaryKey, TradePosition]]: """Return open and closed positions over all traded assets.""" for pair_id, history in self.asset_histories.items(): for position in history.positions: if position.is_open(): yield pair_id, position
[docs] def calculate_summary_statistics(self, time_bucket: Optional[TimeBucket] = None) -> TradeSummary: """Calculate some statistics how our trades went. :param time_bucket: Optional, used to display average duration as 'number of bars' instead of 'number of days'. :return: TradeSummary instance """ if(time_bucket is not None): assert isinstance(time_bucket, TimeBucket), "Not a valid time bucket" def get_avg_profit_pct_check(trades: List | None): return float(np.mean(trades)) if trades else None def get_avg_trade_duration(duration_list: List | None, time_bucket: TimeBucket | None): if duration_list: if isinstance(time_bucket, TimeBucket): return np.mean(duration_list)/time_bucket.to_timedelta() else: return np.mean(duration_list) else: return datetime.timedelta(0) def max_check(lst): return max(lst) if lst else None def min_check(lst): return min(lst) if lst else None def avg_check(lst): return avg(lst) if lst else None def median_check(lst): return median(lst) if lst else None initial_cash = self.portfolio.get_initial_deposit() uninvested_cash = self.portfolio.get_current_cash() # EthLisbon hack extra_return = 0 duration = datetime.timedelta(0) winning_trades = [] losing_trades = [] winning_trades_duration = [] losing_trades_duration = [] loss_risk_at_open_pc = [] realised_losses = [] biggest_winning_trade_pc = None biggest_losing_trade_pc = None average_duration_of_losing_trades = datetime.timedelta(0) average_duration_of_winning_trades = datetime.timedelta(0) first_trade, last_trade = self.portfolio.get_first_and_last_executed_trade() if first_trade and first_trade != last_trade: duration = last_trade.executed_at - first_trade.executed_at won = lost = zero_loss = stop_losses = undecided = 0 open_value: USDollarAmount = 0 profit: USDollarAmount = 0 positions = [] for pair_id, position in self.get_all_positions(): if position.is_open(): open_value += position.open_value undecided += 1 continue full_position = self.portfolio.get_position_by_id(position.position_id) if position.is_stop_loss(): stop_losses += 1 if position.is_win(): won += 1 winning_trades.append(position.realised_profit_percent) winning_trades_duration.append(position.duration) elif position.is_lose(): lost += 1 losing_trades.append(position.realised_profit_percent) losing_trades_duration.append(position.duration) realised_loss = position.realised_profit/full_position.portfolio_value_at_open realised_losses.append(realised_loss) else: # Any profit exactly balances out loss in slippage and commission zero_loss += 1 profit += position.realised_profit if full_position.stop_loss: loss_risk_at_open_pc.append(full_position.get_loss_risk_at_open_pct()) else: loss_risk_at_open_pc.append(full_position.get_capital_tied_at_open_pct()) positions.append(full_position) # sort positions by position id (chronologically) positions.sort(key=lambda x: x.position_id) all_trades = winning_trades + losing_trades + [0 for i in range(zero_loss)] average_trade = avg_check(all_trades) median_trade = median_check(all_trades) average_winning_trade_profit_pc = get_avg_profit_pct_check(winning_trades) average_losing_trade_loss_pc = get_avg_profit_pct_check(losing_trades) max_realised_loss = min_check(realised_losses) avg_realised_risk = avg_check(realised_losses) max_loss_risk_at_open_pc = max_check(loss_risk_at_open_pc) biggest_winning_trade_pc = max_check(winning_trades) biggest_losing_trade_pc = min_check(losing_trades) average_duration_of_winning_trades = get_avg_trade_duration(winning_trades_duration, time_bucket) average_duration_of_losing_trades = get_avg_trade_duration(losing_trades_duration, time_bucket) max_pos_cons, max_neg_cons, max_pullback = self.get_max_consective(positions) return TradeSummary( won=won, lost=lost, zero_loss=zero_loss, stop_losses=stop_losses, undecided=undecided, realised_profit=profit + extra_return, open_value=open_value, uninvested_cash=uninvested_cash, initial_cash=initial_cash, extra_return=extra_return, duration=duration, average_winning_trade_profit_pc=average_winning_trade_profit_pc, average_losing_trade_loss_pc=average_losing_trade_loss_pc, biggest_winning_trade_pc=biggest_winning_trade_pc, biggest_losing_trade_pc=biggest_losing_trade_pc, average_duration_of_winning_trades=average_duration_of_winning_trades, average_duration_of_losing_trades=average_duration_of_losing_trades, average_trade=average_trade, median_trade=median_trade, max_pos_cons=max_pos_cons, max_neg_cons=max_neg_cons, max_pullback=max_pullback, max_loss_risk=max_loss_risk_at_open_pc, max_realised_loss=max_realised_loss, avg_realised_risk=avg_realised_risk, time_bucket=time_bucket )
[docs] def create_timeline(self) -> pd.DataFrame: """Create a timeline feed how we traded over a course of time. Note: We assume each position has only one enter and exit event, not position increases over the lifetime. :return: DataFrame with timestamp and timeline_event columns """ def gen_events(): for pair_id, position in self.get_all_positions(): yield (position.position_id, position) df = pd.DataFrame(gen_events(), columns=["position_id", "position"]) return df
[docs] def get_timeline_stats(self): """create ordered timeline of trades for stats that need it""" timeline = self.create_timeline() raw_timeline = expand_timeline_raw(timeline) # Max capital at risk at SL (don't confuse stop_losses and stop_loss_rows) # max_capital_at_risk_sl = None # stop_loss_rows = raw_timeline.loc[raw_timeline['Remarks'] == 'SL'] # if (stop_loss_pct is not None) and stop_loss_rows: # #raise ValueError("Missing argument: if raw_timeline is provided, then stop loss must also be provided") # max_capital_at_risk_sl = max(((1-stop_loss_pct)*stop_loss_rows['position_max_size'])/stop_loss_rows['opening_capital']) return self.get_max_consective(raw_timeline)
[docs] @staticmethod def get_max_consective(positions: List[TradingPosition]) -> tuple[int, int ,int] | tuple[None, None, None]: """May be used in calculate_summary_statistics :param positions: An list of trading positions, ordered by opened_at time. Note, must be ordered to be correct. """ if not positions: return None, None, None max_pos_cons = 0 max_neg_cons = 0 max_pullback_pct = 0 pos_cons = 0 neg_cons = 0 pullback = 0 for position in positions: # don't do anything if profit = $0 if(position.get_realised_profit_usd() > 0): neg_cons = 0 pullback = 0 pos_cons += 1 elif(position.get_realised_profit_usd() < 0): pos_cons = 0 neg_cons += 1 pullback += position.get_realised_profit_usd() if(neg_cons > max_neg_cons): max_neg_cons = neg_cons if(pos_cons > max_pos_cons): max_pos_cons = pos_cons pullback_pct = pullback/(position.portfolio_value_at_open + position.get_realised_profit_usd()) if(pullback_pct < max_pullback_pct): # pull back is in the negative direction max_pullback_pct = pullback_pct return max_pos_cons, max_neg_cons, max_pullback_pct
[docs]class TimelineRowStylingMode(enum.Enum): #: Style using Pandas background_gradient gradient = "gradient" #: Simple #: Profit = green, loss = red simple = "simple"
[docs]class TimelineStyler: """Style the expanded trades timeline table. Give HTML hints for DataFrame how it should be rendered in the notebook output. """
[docs] def __init__(self, row_styling: TimelineRowStylingMode, hidden_columns: List[str], vmin: float, vmax: float, ): self.row_styling = row_styling self.hidden_columns = hidden_columns self.vmin = vmin self.vmax = vmax
[docs] def colour_timelime_row_simple(self, row: pd.Series) -> pd.Series: """Set colour for each timeline row based on its profit. - +/- 5% colouring - More information: - CSS colours: """ pnl_raw = row["PnL % raw"] if pnl_raw < -0.05: return pd.Series('background-color: Salmon', row.index) elif pnl_raw < 0: return pd.Series('background-color: LightSalmon', row.index) elif pnl_raw > 0.05: return pd.Series('background-color: LawnGreen', row.index) else: return pd.Series('background-color: PaleGreen', row.index)
[docs] def __call__(self, df: pd.DataFrame): """Applies styles on a dataframe :param df: Dataframe as returned by :py:func`expand_timeline`. """ # Create a Pandas Styler with multiple styling options applied try: styles = \ .hide(axis="index") \ .hide(axis="columns", subset=self.hidden_columns) except KeyError: # The input df was empty (no trades) styles = # Don't let the text inside a cell to wrap styles = styles.set_table_styles({ "Opened at": [{'selector': 'td', 'props': [('white-space', 'nowrap')]}], "Exchange": [{'selector': 'td', 'props': [('white-space', 'nowrap')]}], }) if self.row_styling == TimelineRowStylingMode.gradient: # Dynamically color the background of trade outcome coluns # # TODO: This gradient styling is confusing # get rid of it long term styles = styles.background_gradient( axis=0, gmap=df['PnL % raw'], cmap='RdYlGn', vmin=self.vmin, # We can only lose 100% of our money on position vmax=self.vmax) # 50% profit is 21.5 position. Assume this is the max success color we can hit over else: styles = styles.apply(self.colour_timelime_row_simple, axis=1) return styles
[docs]def expand_timeline( exchanges: Set[Exchange], pair_universe: PandasPairUniverse, timeline: pd.DataFrame, vmin=-0.3, vmax=0.2, timestamp_format="%Y-%m-%d", hidden_columns=["Id", "PnL % raw"], row_styling_mode=TimelineRowStylingMode.simple, ) -> Tuple[pd.DataFrame, TimelineStyler]: """Expand trade history timeline to human readable table. This will the outputting much easier in Python Notebooks. Currently does not incrementing/decreasing positions gradually. Instaqd of applying styles or returning a styled dataframe, we return a callable that applies the styles. This is because of Pandas issue - hidden indexes, columns, etc. are not exported. :param exchanges: Needed for exchange metadata :param pair_universe: Needed for trading pair metadata :param vmax: Trade success % to have the extreme green color. :param vmin: The % of lost capital on the trade to have the extreme red color. :param timestamp_format: How to format Opened at column, as passed to `strftime()` :param hidden_columns: Hide columns in the output table :return: DataFrame with human=readable position win/loss information, having DF indexed by timestamps and a styler function """ exchange_map = {e.exchange_id: e for e in exchanges} # def expander(row): position: TradePosition = row["position"] # timestamp = # ??? pair_id = position.pair_id pair_info = pair_universe.get_pair_by_id(pair_id) exchange = exchange_map.get(pair_info.exchange_id) if not exchange: raise RuntimeError(f"No exchange for id {pair_info.exchange_id}, pair {pair_info}") if position.is_stop_loss(): remarks = "SL" elif position.is_take_profit(): remarks = "TP" else: remarks = "" r = { # "timestamp": timestamp, "Id": position.position_id, "Remarks": remarks, "Opened at": position.opened_at.strftime(timestamp_format), "Duration": format_duration_days_hours_mins(position.duration) if position.duration else np.nan, "Exchange":, "Base asset": pair_info.base_token_symbol, "Quote asset": pair_info.quote_token_symbol, "Position max value": format_value(position.get_max_size()), "PnL USD": format_value(position.realised_profit) if position.is_closed() else np.nan, "PnL %": format_percent_2_decimals(position.realised_profit_percent) if position.is_closed() else np.nan, "PnL % raw": position.realised_profit_percent if position.is_closed() else 0, "Open mid price USD": format_price(position.open_price), "Close mid price USD": format_price(position.close_price) if position.is_closed() else np.nan, "Trade count": position.get_trade_count(), "LP fees": f"${position.get_total_lp_fees_paid():,.2f}" } return r applied_df = timeline.apply(expander, axis='columns', result_type='expand') if len(applied_df) > 0: # applied_df \ .sort_values(by=['Id'], ascending=[True], inplace=True) # Get rid of NaN labels # applied_df.fillna('', inplace=True) styling = TimelineStyler( row_styling=row_styling_mode, hidden_columns=hidden_columns, vmin=vmin, vmax=vmax, ) return applied_df, styling
[docs]def expand_timeline_raw( timeline: pd.DataFrame, timestamp_format="%Y-%m-%d" ) -> pd.DataFrame: # sourcery skip: remove-unreachable-code """A simplified version of expand_timeline that does not care about pair info, exchanges, or opening capital, and also provides raw figures""" # def expander(row): position: TradePosition = row["position"] # timestamp = # ??? pair_id = position.pair_id if position.is_stop_loss(): remarks = "SL" elif position.is_take_profit(): remarks = "TP" else: remarks = "" pnl_usd = position.realised_profit if position.is_closed() else np.nan r = { # "timestamp": timestamp, "Id": position.position_id, "Remarks": remarks, "Opened at": position.opened_at.strftime(timestamp_format), "Duration": format_duration_days_hours_mins(position.duration) if position.duration else np.nan, "position_max_size": position.get_max_size(), "pnl_usd": pnl_usd, "pnl_pct_raw": position.realised_profit_percent if position.is_closed() else 0, "open_price_usd": position.open_price, "close_price_usd": position.close_price if position.is_closed() else np.nan, "trade_count": position.get_trade_count(), } return r applied_df = timeline.apply(expander, axis='columns', result_type='expand') if len(applied_df) > 0: # applied_df \ .sort_values(by=['Id'], ascending=[True], inplace=True) # Get rid of NaN labels # applied_df.fillna('', inplace=True) return applied_df
[docs]def build_trade_analysis(portfolio: Portfolio) -> TradeAnalysis: """Build a trade analysis from list of positions. - Read positions from backtesting or live state - Create TradeAnalysis instance that can be used to display Jupyter notebook data on the performance """ histories = {} positions = list(portfolio.get_all_positions()) # Sort positions based on their id # because open, closed and frozen positions might be in a mixed order positions = sorted(positions, key=lambda p: p.position_id) # Each Backtrader Trade instance presents a position # Trade instances contain TradeHistory entries that present change to this position # with Order instances attached for position in positions: pair = position.pair pair_id = pair.internal_id assert type(pair_id) == int trade: TradeExecution trades = list(position.trades.values()) for trade in trades: history = histories.get(pair_id) if not history: history = histories[pair_id] = AssetTradeHistory() # filter out failed trade if trade.executed_at is None: continue # Internally negative quantities are for sells quantity = trade.executed_quantity timestamp = pd.Timestamp(trade.executed_at) price = trade.planned_mid_price assert quantity != 0, f"Got bad quantity for {trade}" assert price is not None and price > 0, f"Got invalid trade {trade}" spot_trade = SpotTrade( pair_id=pair_id, trade_id=trade.trade_id, timestamp=timestamp, price=price, executed_price=trade.executed_price, quantity=quantity, commission=0, slippage=0, # TODO trade_type=trade.trade_type, lp_fees_paid=trade.lp_fees_paid ) history.add_trade(spot_trade, position_id=position.position_id) return TradeAnalysis(portfolio, asset_histories=histories)
[docs]def avg(lst: list[int]): return sum(lst) / len(lst)