Source code for tradeexecutor.strategy.qstrader.portfolio_construction_model

"""Portfolio construction model translates alpha model to risk managed trades."""
import datetime
import logging
from typing import Optional, Dict, List, Tuple

import pandas as pd

from tradeexecutor.state.state import State, TradeType
from tradeexecutor.state.position import TradingPosition
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.state.identifier import AssetIdentifier
from tradeexecutor.strategy.pricing_model import PricingModel
from tradeexecutor.strategy.qstrader.alpha_model import AlphaModel
from tradeexecutor.strategy.qstrader.order_sizer import CashBufferedOrderSizer
from tradingstrategy.universe import Universe

from tradeexecutor.strategy.trading_strategy_universe import translate_trading_pair, TradingStrategyUniverse

logger = logging.getLogger(__name__)


[docs]class PortfolioConstructionModel: """Portfolio construction model. Encapsulates the process of generating a target weight vector for a universe of assets, based on input from an AlphaModel, a RiskModel and a TransactionCostModel. """
[docs] def __init__( self, universe: Universe, state: State, order_sizer: CashBufferedOrderSizer, optimiser, pricing_model: PricingModel, reserve_currency: AssetIdentifier, alpha_model: AlphaModel, risk_model=None, cost_model=None ): assert isinstance(universe, Universe) self.universe = universe self.state = state self.order_sizer = order_sizer self.optimiser = optimiser self.alpha_model = alpha_model self.reserve_currency = reserve_currency self.risk_model = risk_model self.cost_model = cost_model self.pricing_model = pricing_model
def _obtain_full_asset_list(self, dt): """ Create a union of the Assets in the current Universe and those in the Broker Portfolio. Parameters ---------- dt : `pd.Timestamp` The current time used to obtain Universe Assets. Returns ------- `list[int]` The sorted full list of Asset symbol strings. """ return self.universe.pairs.get_all_pair_ids() def _create_zero_target_weight_vector(self, full_assets): """ Create an initial zero target weight vector for all assets in both the Broker Portfolio and current Universe. Parameters ---------- full_assets : `list[str]` The full list of asset symbols. Returns ------- `dict{str: float}` The zero target weight vector for all Assets. """ return {asset: 0.0 for asset in full_assets} def _create_full_asset_weight_vector(self, zero_weights, optimised_weights): """ Ensure any Assets in the Broker Portfolio are sold out if they are not specifically referenced on the optimised weights. Parameters ---------- zero_weights : `dict{str: float}` The full weight list of assets, all with zero weight. optimised_weights : `dict{str: float}` The weight list for those assets having a non-zero weight. Overrides the zero-weights where keys intersect. Returns ------- `dict{str: float}` The union of the zero-weights and optimised weights, where the optimised weights take precedence. """ return {**zero_weights, **optimised_weights} def _obtain_current_portfolio(self): """ Query the broker for the current account asset quantities and return as a portfolio dictionary. Returns ------- `dict{str: dict}` Current broker account asset quantities in integral units. """ res = {} for id, quantity in self.state.portfolio.get_open_quantities_by_internal_id().items(): res[id] = {"quantity": quantity} return res def _generate_rebalance_trades( self, dt: datetime.datetime, target_portfolio, current_portfolio, target_prices, debug_details: dict, ) -> List[TradeExecution]: """ Creates an incremental list of rebalancing Orders from the provided target and current portfolios. Parameters ---------- dt : `pd.Timestamp` The current time used to populate the Order instances. target_portfolio : `dict{str: dict}` Target asset quantities in integral units. curent_portfolio : `dict{str: dict}` Current (broker) asset quantities in integral units. target_prices : `dict{str: decimal}` Target asset price for 1 unit """ # Set all assets from the target portfolio that # aren't in the current portfolio to zero quantity # within the current portfolio for asset in target_portfolio: if asset not in current_portfolio: current_portfolio[asset] = {"quantity": 0} # Set all assets from the current portfolio that # aren't in the target portfolio (and aren't cash) to # zero quantity within the target portfolio for asset in current_portfolio: if type(asset) != str: if asset not in target_portfolio: target_portfolio[asset] = {"quantity": 0} # Iterate through the asset list and create the difference # quantities required for each asset rebalance_portfolio = {} for asset in target_portfolio.keys(): target_qty = target_portfolio[asset]["quantity"] current_qty = current_portfolio[asset]["quantity"] order_qty = target_qty - current_qty rebalance_portfolio[asset] = {"quantity": order_qty} rebalance_trades = [] new_positions = [] # Sanity check for the old/new positions with logging pos: TradingPosition for pos in self.state.portfolio.open_positions.values(): logger.info("Rebalance, existing position #%d, pool: %s", pos.position_id, pos.pair.pool_address) for asset, asset_dict in sorted(rebalance_portfolio.items(), key=lambda x: x[0]): quantity = rebalance_portfolio[asset]["quantity"] if quantity != 0: pandas_pair = self.universe.pairs.get_pair_by_id(asset) executor_pair = translate_trading_pair(pandas_pair) # For some reason, the price of the asset has disappeared. # Do not spend too much time on this, because this code is going # to disappear as well. price = target_prices.get(asset) if price is None: logger.warning(f"Price missing for asset {asset} - prices are {target_prices}") continue if isinstance(dt, pd.Timestamp): dt = dt.to_pydatetime().replace(tzinfo=None) position, trade, created = self.state.create_trade( dt, executor_pair, quantity, None, price, TradeType.rebalance, self.reserve_currency, 1.0, # TODO: Harcoded stablecoin USD exchange rate ) logger.info("Created trade, pair:%s, position #%d, trade #%d, new position:%s", position.pair, position.position_id, trade.trade_id, created) rebalance_trades.append(trade) new_positions.append(position) # Sort trades so that sells always go first rebalance_trades.sort(key=lambda t: t.get_execution_sort_position()) return rebalance_trades def _create_zero_target_weights_vector(self, dt): """ Determine the Asset Universe at the provided date-time and use this to generate a weight vector of zero scalar value for each Asset. Parameters ---------- dt : `pd.Timestamp` The date-time used to determine the Asset list. Returns ------- `dict{str: float}` The zero-weight vector keyed by Asset symbol. """ assets = self.universe.get_assets(dt) return {asset: 0.0 for asset in assets}
[docs] def get_all_prices(self): """Get prices for all asssets."""
[docs] def __call__(self, dt: pd.Timestamp, stats=None, debug_details: Optional[Dict] = None) -> List[TradeExecution]: """ Execute the portfolio construction process at a particular provided date-time. Use the optional alpha model, risk model and cost model instances to create a list of desired weights that are then sent to the target weight generator instance to be optimised. """ logger.info("Performing portfolio constructions for %s", dt) weights = self.alpha_model(dt, self.universe, self.state, debug_details) # Expose internal states to unit tests debug_details["alpha_model_weights"] = weights logger.info("We have %d alpha model weights", len(weights)) # If a risk model is present use it to potentially # override the alpha model weights if self.risk_model: weights = self.risk_model(dt, weights) # Run the portfolio optimisation optimised_weights = self.optimiser(dt, initial_weights=weights) # Ensure any Assets in the Broker Portfolio are sold out if # they are not specifically referenced on the optimised weights full_assets = self._obtain_full_asset_list(dt) full_zero_weights = self._create_zero_target_weight_vector(full_assets) full_weights = self._create_full_asset_weight_vector( full_zero_weights, optimised_weights ) # Calculate target portfolio in notional target_portfolio, target_prices = self.order_sizer(dt, weights, debug_details) logger.info("We have %d entries in the target portfolio", len(target_portfolio)) # Obtain current Broker account portfolio current_portfolio = self._obtain_current_portfolio() # Get prices for existing assets so we have some idea how much they sell for for asset_id, asset_data in current_portfolio.items(): pair = self.pricing_model.get_pair_for_id(asset_id) if pair: price_structure = self.pricing_model.get_buy_price(dt, pair, None) target_prices[asset_id] = price_structure.price # Expose internal states to unit tests debug_details["positions_at_start_of_construction"] = current_portfolio.copy() # current_portfolio is mutated later # Create rebalance trade Orders rebalance_trades = self._generate_rebalance_trades( dt, target_portfolio, current_portfolio, target_prices, debug_details ) # Expose internal states to unit tests debug_details["target_portfolio"] = target_portfolio debug_details["target_prices"] = target_prices debug_details["rebalance_trades"] = rebalance_trades logger.info("Requesting %d rebalance trades", len(rebalance_trades)) return rebalance_trades