Source code for tradeexecutor.strategy.interest

"""Functions to refresh accrued interest on credit positions."""
import logging
import datetime
from collections import Counter
from decimal import Decimal
from typing import Tuple, Dict, List, Iterable

from eth_defi.aave_v3.rates import SECONDS_PER_YEAR_INT
from eth_defi.provider.broken_provider import get_almost_latest_block_number
from tradingstrategy.utils.time import ZERO_TIMEDELTA

from tradeexecutor.state.balance_update import BalanceUpdate, BalanceUpdatePositionType, BalanceUpdateCause
from tradeexecutor.state.identifier import AssetIdentifier
from tradeexecutor.state.interest_distribution import InterestDistributionEntry, InterestDistributionOperation, AssetInterestData
from tradeexecutor.state.loan import LoanSide
from tradeexecutor.state.portfolio import Portfolio
from tradeexecutor.state.position import TradingPosition
from tradeexecutor.state.state import State
from tradeexecutor.state.types import USDollarPrice, Percent, BlockNumber
from tradeexecutor.strategy.pricing_model import PricingModel
from tradeexecutor.utils.accuracy import QUANTITY_EPSILON, INTEREST_EPSILON
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradeexecutor.ethereum.onchain_balance import fetch_address_balances

logger = logging.getLogger(__name__)


[docs]def update_interest( state: State, position: TradingPosition, asset: AssetIdentifier, new_token_amount: Decimal, event_at: datetime.datetime, asset_price: USDollarPrice, block_number: int | None = None, tx_hash: int | None = None, log_index: int | None = None, max_interest_gain: Percent = 0.05, ) -> BalanceUpdate: """Poke leverage position to increase its interest amount. :param position: Trading position to update :param asset: The asset of which we update the events for. aToken for collateral, vToken for debt. :param new_token_amount: The new on-chain value of aToken/vToken tracking the loan. :param asset_price: The latest known price for the underlying asset. Needed to revalue dollar nominated loans. :param event_at: Block mined timestamp :param max_interest_gain: Safety threshold to check that any interest gains are below this value. Terminate execution if bad math detected. """ assert asset is not None assert position.is_open() or position.is_frozen(), f"Cannot update interest for position {position.position_id}\n" \ f"Position details: {position}\n" \ f"Position closed at: {position.closed_at}\n" \ f"Interest event at: {event_at}" assert type(asset_price) == float, f"Got {asset_price.__class__}" assert isinstance(new_token_amount, Decimal), f"Got {new_token_amount.__class__}" loan = position.loan assert loan if asset == loan.collateral.asset: interest = loan.collateral_interest elif loan.borrowed and asset == position.loan.borrowed.asset: interest = loan.borrowed_interest else: raise AssertionError( f"Loan {loan} does not have asset {asset}\n" f"We have\n" f"- {loan.collateral.asset}\n" f"- {loan.borrowed.asset if loan.borrowed else '<no borrow>'}" ) assert interest, f"Position does not have interest tracked set up on {asset.token_symbol}:\n" \ f"{position} \n" \ f"for asset {asset}" portfolio = state.portfolio event_id = portfolio.allocate_balance_update_id() # assert asset.underlying.is_stablecoin(), f"Credit supply is currently supported for stablecoin assets with 1:1 USD price assumption. Got: {asset}" previous_update_at = interest.last_event_at previous_block = interest.last_updated_block_number old_balance = interest.last_token_amount gained_interest = new_token_amount - old_balance usd_value = float(new_token_amount) * asset_price # TODO: Not sure we zero/negative rebalances could happen, but # log them as warning for now if gained_interest < 0: log_level = logging.WARNING else: log_level = logging.INFO logger.log( log_level, f"update_interest(), block {block_number or 0:,}, new token amount: %s, old balance: %s, gained interest tokens: %s, position new USD value: %s, previous update at %s, previous block at %s", new_token_amount, old_balance, gained_interest, usd_value, previous_update_at, f"{previous_block or 0:,}", ) gained_interest_percent = gained_interest / old_balance # assert gained_interest_percent >= 0, f"Negative interest for {asset}: gained interest: {gained_interest} (diff {gained_interest_percent * 100:.2f}%), old quantity: {old_balance}, new quantity: {new_token_amount}" assert abs(gained_interest_percent) < max_interest_gain, f"Unlikely gained_interest for {asset}: {gained_interest} (diff {gained_interest_percent * 100:.2f}%, threshold {max_interest_gain * 100}%), old quantity: {old_balance}, new quantity: {new_token_amount}" evt = BalanceUpdate( balance_update_id=event_id, position_type=BalanceUpdatePositionType.open_position, cause=BalanceUpdateCause.interest, asset=asset, block_mined_at=event_at, strategy_cycle_included_at=None, chain_id=asset.chain_id, old_balance=old_balance, usd_value=usd_value, quantity=gained_interest, owner_address=None, tx_hash=tx_hash, log_index=log_index, position_id=position.position_id, block_number=block_number, previous_update_at=previous_update_at, ) position.add_balance_update_event(evt) # Update interest stats interest.last_accrued_interest = position.calculate_accrued_interest_quantity(asset) interest.last_updated_at = event_at interest.last_event_at = event_at interest.last_updated_block_number = block_number interest.last_token_amount = new_token_amount return evt
[docs]def update_leveraged_position_interest( state: State, position: TradingPosition, new_vtoken_amount: Decimal, new_token_amount: Decimal, event_at: datetime.datetime, vtoken_price: USDollarPrice, atoken_price: USDollarPrice = 1.0, block_number: int | None = None, tx_hash: int | None = None, log_index: int | None = None, max_interest_gain: Percent = 0.05, ) -> Tuple[BalanceUpdate, BalanceUpdate]: """Updates accrued interest on lending protocol leveraged positions. Updates loan interest state for both collateral and debt. :param atoken_price: What is the current price of aToken. Needed to calculate dollar nominated amounts. :param vtoken_price: What is the current price of vToken Needed to calculate dollar nominated amounts. :return: Tuple (vToken update event, aToken update event) """ assert position.is_leverage() pair = position.pair # vToken vevt = update_interest( state, position, pair.base, new_vtoken_amount, event_at, vtoken_price, block_number, tx_hash, log_index, max_interest_gain=max_interest_gain, ) logger.info("Updated leveraged interest %s for %s", pair.base, vevt) # aToken aevt = update_interest( state, position, pair.quote, new_token_amount, event_at, atoken_price, block_number, tx_hash, log_index, max_interest_gain=max_interest_gain, ) logger.info("Updated leveraged interest %s for %s", pair.quote, aevt) return (vevt, aevt)
[docs]def estimate_interest( start_at: datetime.datetime, end_at: datetime.datetime, start_quantity: Decimal, interest_rate: float, year=datetime.timedelta(seconds=SECONDS_PER_YEAR_INT), ) -> Decimal: """Calculate new token amount, assuming fixed interest. :param interest_rate: Yearly interest relative to. 1 = 0%. E.g. 1.02 for 2% yearly gained interest. Always positive. :param start_quantity: Tokens at the start of the period :param year: Year length in Aave. :return: Amount of token quantity with principal + interest after the period. """ # 150x assert interest_rate >= 1 assert end_at >= start_at duration = end_at - start_at multiplier = (end_at - start_at) / year return start_quantity * Decimal(interest_rate ** multiplier)
[docs]def prepare_interest_distribution( start: datetime.datetime, end: datetime.datetime, portfolio: Portfolio, pricing_model: PricingModel ) -> InterestDistributionOperation: """Get all tokens in open positions that accrue interest. - We use this data to sync the accrued interest since the last cycle - See :py:func:`tradeexecutor.strategy.sync_model.SyncModel.sync_interests` :return: Interest bearing assets used in all open positions """ assets = set() totals: Counter[AssetIdentifier, Decimal] = Counter() entries: List[InterestDistributionEntry] = [] asset_interest_data: Dict[str, AssetInterestData] = {} position_count = 0 timestamp = end for p in portfolio.get_open_and_frozen_positions(): for asset in (p.pair.base, p.pair.quote): if not asset.is_interest_accruing(): # One side in spot-credit pair continue side, tracker = p.loan.get_tracked_asset(asset) assert side is not None, f"Got confused with asset {asset} on position {p}" if side == LoanSide.collateral: # Currently supports stablecoin collateral only if not p.is_credit_supply(): assert not p.is_long(), f"Cannot handle position: {p}" underlying = asset.get_pricing_asset() assert underlying.is_stablecoin(), f"Asset is collateral but not stablecoin based: {asset}. Unsupported yet." price = 1.0 else: price_structure = pricing_model.get_sell_price( timestamp, p.pair.get_pricing_pair(), tracker.quantity, ) price = price_structure.price entry = InterestDistributionEntry( side=side, position=p, tracker=tracker, price=price, ) assert entry.quantity > 0, f"Zero-amount entry in the interest distribution: {p}: {tracker}" entries.append(entry) assets.add(asset) # Update totals asset_id = asset.get_identifier() asset_interest = asset_interest_data.get(asset_id, AssetInterestData()) asset_interest.total += entry.quantity asset_interest_data[asset_id] = asset_interest position_count += 1 # Calculate distribution weights for entry in entries: entry.weight = entry.quantity / asset_interest_data[entry.asset.get_identifier()].total logger.info("Preparing interest distribution with %d assets, %d positions, %d ledger entries", len(assets), position_count, len(entries)) return InterestDistributionOperation( start, end, assets, asset_interest_data=asset_interest_data, entries=entries, effective_rate={}, )
[docs]def distribute_to_entry( entry: InterestDistributionEntry, state: State, timestamp: datetime.datetime, block_number: BlockNumber, total_accrued: Decimal, max_interest_gain: Percent, ) -> BalanceUpdate: """Update interest one position, one side of loan.""" position_accrued = total_accrued * entry.weight # Calculate per-position portion of new tokens new_token_amount = entry.tracker.quantity + position_accrued assert entry.price is not None, f"Asset lacks updated price: {entry.asset}" assert new_token_amount > 0 assert new_token_amount >= entry.tracker.quantity evt = update_interest( state, entry.position, entry.asset, new_token_amount=new_token_amount, event_at=timestamp, asset_price=entry.price, block_number=block_number, max_interest_gain=max_interest_gain, ) return evt
[docs]def distribute_interest_for_assets( operation: InterestDistributionOperation, state: State, asset: AssetIdentifier, timestamp: datetime.datetime, block_number: BlockNumber | None, new_amount: Decimal, max_interest_gain: Percent, ) -> Iterable[BalanceUpdate]: """Distribute the accrued interest of an asset across all positions holding this asset. :return: An event """ previous_asset_total = operation.asset_interest_data[asset.get_identifier()].total # Either there has not be really any change over time (too fast refresh rate) # or this is unit test against mainnet fork where we cannot speed up the time. # In both cases we cannot generate any BalanceUpdate events, # because balance update can not be zero. # We also may encounter negative updates < epsilon due to the rounding # errors. interest_accrued_tokens = new_amount - previous_asset_total logger.info( "Interest accrued for asset: %s, interest accrued: %s, new amount: %s, previous asset total: %s", asset, interest_accrued_tokens, new_amount, previous_asset_total ) if abs(interest_accrued_tokens) >= INTEREST_EPSILON: assert interest_accrued_tokens >= 0, f"Interest cannot go negative: {interest_accrued_tokens}, our epsilon is {INTEREST_EPSILON}" for entry in operation.entries: if entry.asset == asset: evt = distribute_to_entry( entry, state, timestamp, block_number, interest_accrued_tokens, max_interest_gain=max_interest_gain, ) yield evt
[docs]def accrue_interest( state: State, on_chain_balances: Dict[AssetIdentifier, Decimal], interest_distribution: InterestDistributionOperation, block_timestamp: datetime.datetime, block_number: BlockNumber | None, max_interest_gain: Percent = 0.05, aave_financial_year=datetime.timedelta(seconds=SECONDS_PER_YEAR_INT), ) -> Iterable[BalanceUpdate]: """Update the internal ledger to match interest accrued on on-chain balances. - Read incoming on-chain balance updates - Distribute it to the trading positions based on our ``interest_distribution`` - Set the interest sync checkpoint :param state: Strategy state. :param on_chain_balances: The current on-chain balances at ``block_number``. :param block_number: The block number when we read the balances. :param block_timestamp: The timestamp of ``block_number``. :param max_interest_gain: Abort if some asset has gained more interest than this threshold. A safety check to abort buggy code. :return: Balance update events applied to all positions. """ if interest_distribution.duration == ZERO_TIMEDELTA: logger.info("accrue_interest(): Interest distribution duration zero, we probably got called twice in a row") return assert interest_distribution.duration > ZERO_TIMEDELTA, f"Tried to distribute interest for negative timespan {interest_distribution.start} - {interest_distribution.end}" block_number_str = f"{block_number,}" if block_number else "<no block>" logger.info(f"accrue_interest(block_timestamp={block_timestamp}, {block_number_str})") part_of_year = interest_distribution.duration / aave_financial_year for asset, new_balance in on_chain_balances.items(): # Track the effective interest for the asset asset_interest_data = interest_distribution.get_interest_data(asset) interest = float((new_balance - asset_interest_data.total) / asset_interest_data.total) / part_of_year asset_interest_data.effective_rate = interest logger.info( "Effective interest is %f (%f %%) for the asset %s, new_balance: %s, previous total: %s", interest, interest * 100, asset, new_balance, asset_interest_data.total ) # We cannot generate interest events for zero updates, # as it breaks math if interest == 0: logger.warning(f"Effective interest is zero for the asset %s", asset) continue yield from distribute_interest_for_assets( interest_distribution, state, asset, block_timestamp, block_number, on_chain_balances[asset], max_interest_gain=max_interest_gain, ) set_interest_checkpoint(state, block_timestamp, block_number, interest_distribution)
# # events = [] # for p in positions: # if p.is_credit_supply(): # assert len(p.trades) <= 2, "This interest calculation does not support increase/reduce position" # # new_amount = p.loan.collateral_interest.last_token_amount + accrued # # # TODO: the collateral is stablecoin so this can be hardcode for now # # but make sure to fetch it from somewhere later # price = 1.0 # # evt = update_interest( # state, # p, # p.pair.base, # new_token_amount=new_amount, # event_at=timestamp, # asset_price=price, # ) # events.append(evt) # # # Make atokens magically appear in the simulated # # backtest wallet. The amount must be updated, or # # otherwise we get errors when closing the position. # self.wallet.update_token_info(p.pair.base) # self.wallet.update_balance(p.pair.base.address, accrued) # elif p.is_leverage() and p.is_short(): # assert len(p.trades) <= 2, "This interest calculation does not support increase/reduce position" # # accrued_collateral_interest = self.calculate_accrued_interest( # universe, # p, # timestamp, # "collateral", # ) # accrued_borrow_interest = self.calculate_accrued_interest( # universe, # p, # timestamp, # "borrow", # ) # # new_atoken_amount = p.loan.collateral_interest.last_token_amount + accrued_collateral_interest # new_vtoken_amount = p.loan.borrowed_interest.last_token_amount + accrued_borrow_interest # # atoken_price = 1.0 # # vtoken_price_structure = pricing_model.get_sell_price( # timestamp, # p.pair.get_pricing_pair(), # p.loan.borrowed.quantity, # ) # vtoken_price = vtoken_price_structure.price # # vevt, aevt = update_leveraged_position_interest( # state, # p, # new_vtoken_amount=new_vtoken_amount, # new_token_amount=new_atoken_amount, # vtoken_price=vtoken_price, # atoken_price=atoken_price, # event_at=timestamp, # ) # events.append(vevt) # events.append(aevt) # return events
[docs]def set_interest_checkpoint( state: State, timestamp: datetime.datetime, block_number: BlockNumber | None, distribution: InterestDistributionOperation | None = None, ): """Set the last updated at flag for rebase interest calcualtions at the internal state.""" assert isinstance(timestamp, datetime.datetime) if block_number is not None: assert type(block_number) == int state.sync.interest.last_sync_at = timestamp state.sync.interest.last_sync_block = block_number # Always save the last distribution if distribution is not None: state.sync.interest.last_distribution = distribution block_number_str = f"{block_number,}" if block_number else "<no block>" logger.info(f"Interest check point set to {timestamp}, block: {block_number_str}")
[docs]def record_interest_rate( state: State, universe: TradingStrategyUniverse, timestamp: datetime.datetime, ): """Record interest rate at the time opening position. - Currently support only credit supply positions - Sets `interest_rate_at_open` if not set yet """ assert isinstance(universe, TradingStrategyUniverse) assert universe.has_lending_data() logger.info("Filling missing interest rate information") for p in state.portfolio.get_open_and_frozen_positions(): if p.is_credit_supply(): loan = p.loan last_interest_rate = universe.get_latest_supply_apr( timestamp=timestamp, tolerance=datetime.timedelta(days=7), ) / 100 assert 0 < last_interest_rate < 1 logger.info("Recording interest rate %f for %s at %s", last_interest_rate, p, timestamp) if not loan.collateral.interest_rate_at_open: loan.collateral.interest_rate_at_open = last_interest_rate loan.collateral.last_interest_rate = last_interest_rate
[docs]def sync_interests( *, web3, wallet_address: str, timestamp: datetime.datetime, state: State, universe: TradingStrategyUniverse, pricing_model: PricingModel, ) -> List[BalanceUpdate]: """Update position's interests on all tokens that receive interests - Credit supply positions: aToken - Short positions: aToken, vToken :param web3: Web3 connection to the active blockchain :param wallet_address: Hot wallet or vault address :param timestamp: Wall clock time :param state: Current strategy state :param universe: Trading universe that must include lending data :param pricing_model: Used to update asset price in loan """ assert isinstance(timestamp, datetime.datetime), f"got {type(timestamp)}" if not universe.has_lending_data(): # sync_interests() is not needed if the strategy isn't dealing with leverage return [] previous_update_at = state.sync.interest.last_sync_at if not previous_update_at: # No interest based positions yet? logger.info(f"Interest sync checkpoint not set at {timestamp}, nothing to sync/cannot sync interest.") return [] duration = timestamp - previous_update_at if duration <= ZERO_TIMEDELTA: logger.error(f"Sync time span must be positive: {previous_update_at} - {timestamp}") return [] logger.info( "Starting interest distribution operation at: %s, previous update %s, syncing %s", timestamp, previous_update_at, duration, ) record_interest_rate(state, universe, timestamp) interest_distribution = prepare_interest_distribution( state.sync.interest.last_sync_at, timestamp, state.portfolio, pricing_model ) # Then sync interest back from the chain block_identifier = get_almost_latest_block_number(web3) onchain_balances = fetch_address_balances( web3, wallet_address, list(interest_distribution.assets), filter_zero=True, block_number=block_identifier, ) balances = { b.asset: b.amount for b in onchain_balances } # Then distribute gained interest (new atokens/vtokens) among positions events_iter = accrue_interest( state, balances, interest_distribution, timestamp, block_number=block_identifier ) events = list(events_iter) return events