"""Route trades to different Uniswap v2 like exchanges."""
import logging
from _decimal import Decimal
from typing import Dict, Optional, List
import ipdb
from eth_typing import HexAddress
from hexbytes import HexBytes
from eth_defi.token import fetch_erc20_details
from eth_defi.trade import TradeSuccess
from eth_defi.tx import AssetDelta
from eth_defi.uniswap_v3.analysis import analyse_trade_by_receipt
from tradeexecutor.ethereum.swap import get_swap_transactions, report_failure
from tradeexecutor.state.state import State
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.state.types import Percent
from tradeexecutor.utils.blockchain import get_block_timestamp
from tradingstrategy.chain import ChainId
from web3 import Web3
from eth_defi.uniswap_v3.deployment import UniswapV3Deployment, fetch_deployment, mock_partial_deployment_for_analysis
from eth_defi.uniswap_v3.swap import swap_with_slippage_protection
from web3.exceptions import ContractLogicError
from tradeexecutor.ethereum.tx import HotWalletTransactionBuilder
from tradeexecutor.state.identifier import TradingPairIdentifier, AssetIdentifier
from tradeexecutor.state.blockhain_transaction import BlockchainTransaction
from tradingstrategy.pair import PandasPairUniverse
from tradeexecutor.strategy.universe_model import StrategyExecutionUniverse
from tradeexecutor.ethereum.routing_state import (
EthereumRoutingState,
route_tokens, # don't remove, forwarded import
OutOfBalance, # don't remove, forwarded import
get_base_quote,
get_base_quote_intermediary
)
from tradeexecutor.ethereum.routing_model import EthereumRoutingModel
from tradeexecutor.utils.slippage import get_slippage_in_bps
logger = logging.getLogger(__name__)
[docs]class UniswapV3RoutingState(EthereumRoutingState):
[docs] def __init__(
self,
pair_universe: PandasPairUniverse,
tx_builder: Optional[HotWalletTransactionBuilder]=None,
swap_gas_limit=None,
approve_gas_limit=None,
):
super().__init__(pair_universe, tx_builder=tx_builder, swap_gas_limit=swap_gas_limit, approve_gas_limit=approve_gas_limit)
def __repr__(self):
return f"<UniswapV3RoutingState Tx builder: {self.tx_builder} web3: {self.web3}>"
[docs] def get_uniswap_for_pair(self, address_map: dict, target_pair: TradingPairIdentifier) -> UniswapV3Deployment:
"""Get a router for a trading pair."""
return get_uniswap_for_pair(self.web3, address_map, target_pair)
[docs] def trade_on_router_two_way(self,
uniswap: UniswapV3Deployment,
target_pair: TradingPairIdentifier,
reserve_asset: AssetIdentifier,
reserve_amount: int,
max_slippage: Percent,
check_balances: False,
asset_deltas: Optional[List[AssetDelta]] = None,
notes="",
):
"""Prepare the actual swap. Same for Uniswap V2 and V3.
:param check_balances:
Check on-chain balances that the account has enough tokens
and raise exception if not.
"""
base_token, quote_token = get_base_quote(self.web3, target_pair, reserve_asset)
if check_balances:
self.check_has_enough_tokens(quote_token, reserve_amount)
raw_fee = int(target_pair.fee * 1_000_000)
logger.info(
"Creating a trade for %s, slippage tolerance %f, trade reserve %s, amount in %d",
target_pair,
max_slippage,
reserve_asset,
reserve_amount,
)
bound_swap_func = swap_with_slippage_protection(
uniswap,
recipient_address=self.tx_builder.get_token_delivery_address(),
base_token=base_token,
quote_token=quote_token,
amount_in=reserve_amount,
max_slippage=get_slippage_in_bps(max_slippage),
pool_fees=[raw_fee]
)
# If the receiver (vault) has its own security policy for slippage tolerance check it here
receiver_slippage_tolerance = self.tx_builder.get_internal_slippage_tolerance()
if receiver_slippage_tolerance is not None:
# TODO: TxBuilder configures slippage tolerance as opposite of the trades
receiver_slippage_tolerance = 1 - receiver_slippage_tolerance
trade_slippage_tolerance = max_slippage
assert receiver_slippage_tolerance > trade_slippage_tolerance, f"Receiver (vault) slippage tolerance tighter than the trade slippage tolerance.\nReceiver: {receiver_slippage_tolerance}, trade: {trade_slippage_tolerance}"
return self.create_signed_transaction(
uniswap.swap_router,
bound_swap_func,
self.swap_gas_limit,
asset_deltas,
notes=notes,
)
[docs] def trade_on_router_three_way(self,
uniswap: UniswapV3Deployment,
target_pair: TradingPairIdentifier,
intermediary_pair: TradingPairIdentifier,
reserve_asset: AssetIdentifier,
reserve_amount: int,
max_slippage: float,
check_balances: False,
asset_deltas: Optional[List[AssetDelta]] = None,
notes="",
):
"""Prepare the actual swap for three way trade.
:param reserve_asset:
The token we use as input (source) for this trade
:param check_balances:
Check on-chain balances that the account has enough tokens
and raise exception if not.
"""
self.validate_pairs(target_pair, intermediary_pair)
self.validate_exchange(target_pair, intermediary_pair)
base_token, quote_token, intermediary_token = get_base_quote_intermediary(
self.web3,
target_pair,
intermediary_pair,
reserve_asset,
)
if check_balances:
self.check_has_enough_tokens(quote_token, reserve_amount)
# eth_defi uses raw_fees
raw_pool_fees = [int(intermediary_pair.fee * 1_000_000), int(target_pair.fee * 1_000_000)]
if target_pair.base == reserve_asset:
# We are doing sell, reverse fee order
raw_pool_fees.reverse()
bound_swap_func = swap_with_slippage_protection(
uniswap,
recipient_address=self.tx_builder.get_token_delivery_address(),
base_token=base_token,
quote_token=quote_token,
pool_fees=raw_pool_fees,
amount_in=reserve_amount,
max_slippage=get_slippage_in_bps(max_slippage),
intermediate_token=intermediary_token,
)
else:
# Buy
bound_swap_func = swap_with_slippage_protection(
uniswap,
recipient_address=self.tx_builder.get_token_delivery_address(),
base_token=base_token,
quote_token=quote_token,
pool_fees=raw_pool_fees,
amount_in=reserve_amount,
max_slippage=get_slippage_in_bps(max_slippage),
intermediate_token=intermediary_token,
)
return self.create_signed_transaction(
uniswap.swap_router,
bound_swap_func,
self.swap_gas_limit,
asset_deltas,
notes=notes,
)
[docs]class UniswapV3Routing(EthereumRoutingModel):
"""A simple router that does not optimise the trade execution cost. Designed for uniswap-v2 forks.
- Able to trade on multiple exchanges
- Able to three-way trades through predefined intermediary hops,
either on the exchange itself or some outside exchange
"""
[docs] def __init__(self,
address_map: Dict[str, HexAddress],
allowed_intermediary_pairs: Dict[str, str],
reserve_token_address: str,
chain_id: Optional[ChainId] = None,
):
"""
:param address_map:
Defines router smart contracts to be used with each DEX.
Address map is a dict of factory, router, position_manager,
and quoter addresses
:param allowed_intermediary_pairs:
Quote token address -> pair smart contract address mapping.
Because we hold our reserves only in one currecy e.g. BUSD
and we want to trade e.g. Cake/BNB pairs, we need to whitelist
BNB as an allowed intermediary token.
This makes it possible to do BUSD -> BNB -> Cake trade.
This set is the list of pair smart contract addresses that
are allowed to be used as a hop.
:param chain_id:
Store the chain id for which these routes were generated for.
:param reserve_token_address:
Token address of our reserve currency.
Relevent for buy/sell routing.
Lowercase.
"""
super().__init__(allowed_intermediary_pairs, reserve_token_address, chain_id)
assert type(address_map) == dict
self.address_map = self.convert_address_dict_to_lower(address_map)
logger.info(
"Initialised %s\nfactory_router_map: %s\nallowed_intermediary_pairs: %s\nReserve token: %s",
self,
self.address_map,
self.allowed_intermediary_pairs,
reserve_token_address,
)
[docs] def create_routing_state(self,
universe: StrategyExecutionUniverse,
execution_details: dict) -> UniswapV3RoutingState:
"""Create a new routing state for this cycle.
- Connect routing to web3 and hot wallet
- Read on-chain data on what gas fee we are going to use
- Setup transaction builder based on this information
"""
return super().create_routing_state(universe, execution_details, UniswapV3RoutingState)
[docs] def make_direct_trade(
self,
routing_state: EthereumRoutingState,
target_pair: TradingPairIdentifier,
reserve_asset: AssetIdentifier,
reserve_amount: int,
max_slippage: float,
check_balances=False,
asset_deltas: Optional[List[AssetDelta]] = None,
notes="",
) -> list[BlockchainTransaction]:
return super().make_direct_trade(
routing_state,
target_pair,
reserve_asset,
reserve_amount,
max_slippage,
self.address_map,
check_balances,
asset_deltas=asset_deltas,
notes="",
)
[docs] def make_multihop_trade(
self,
routing_state: EthereumRoutingState,
target_pair: TradingPairIdentifier,
intermediary_pair: TradingPairIdentifier,
reserve_asset: AssetIdentifier,
reserve_amount: int,
max_slippage: float,
check_balances=False,
asset_deltas: Optional[List[AssetDelta]] = None,
notes="",
) -> list[BlockchainTransaction]:
return super().make_multihop_trade(
routing_state,
target_pair,
intermediary_pair,
reserve_asset,
reserve_amount,
max_slippage,
self.address_map,
check_balances,
asset_deltas=asset_deltas,
notes="",
)
[docs] def settle_trade(
self,
web3: Web3,
state: State,
trade: TradeExecution,
receipts: Dict[str, dict],
stop_on_execution_failure=False,
):
logger.info(f"Settling Uniswap v3 trade: #{trade.trade_id}")
base_token_details = fetch_erc20_details(web3, trade.pair.base.checksum_address)
quote_token_details = fetch_erc20_details(web3, trade.pair.quote.checksum_address)
reserve = trade.reserve_currency
swap_tx = get_swap_transactions(trade)
uniswap = self.mock_partial_deployment_for_analysis(web3, swap_tx.contract_address)
tx_dict = swap_tx.get_transaction()
try:
receipt = receipts[HexBytes(swap_tx.tx_hash)]
except KeyError as e:
raise KeyError(f"Could not find hash: {swap_tx.tx_hash} in {receipts}") from e
input_args = swap_tx.get_actual_function_input_args()
result = analyse_trade_by_receipt(
web3,
uniswap=uniswap,
tx=tx_dict,
tx_hash=swap_tx.tx_hash,
tx_receipt=receipt,
input_args=input_args,
)
ts = get_block_timestamp(web3, receipt["blockNumber"])
if isinstance(result, TradeSuccess):
# v3 path includes fee (int) as well
path = [a.lower() for a in result.path if type(a) == str]
if trade.is_buy():
assert path[0] == reserve.address, f"Was expecting the route path to start with reserve token {reserve}, got path {result.path}"
# price = result.get_human_price(quote_token_details.address == result.token0.address)
executed_reserve = result.amount_in / Decimal(10 ** reserve.decimals)
executed_amount = result.amount_out / Decimal(10 ** base_token_details.decimals)
price = executed_reserve / executed_amount
# lp fee is already in terms of quote token
lp_fee_paid = result.lp_fee_paid
else:
# Ordered other way around
assert path[0] == base_token_details.address.lower(), f"Path is {path}, base token is {base_token_details}"
assert path[-1] == reserve.address
# price = result.get_human_price(quote_token_details.address == result.token0.address)
executed_amount = -result.amount_in / Decimal(10 ** base_token_details.decimals)
executed_reserve = result.amount_out / Decimal(10 ** reserve.decimals)
price = -executed_reserve / executed_amount
# convert lp fee to be in terms of quote token
lp_fee_paid = result.lp_fee_paid * float(price) if result.lp_fee_paid else None
assert (executed_reserve > 0) and (executed_amount != 0) and (price > 0), f"Executed amount {executed_amount}, executed_reserve: {executed_reserve}, price: {price}"
logger.info(f"Executed: {executed_amount} {trade.pair.base.token_symbol}, {executed_reserve} {trade.pair.quote.token_symbol}")
# Mark as success
state.mark_trade_success(
ts,
trade,
executed_price=float(price),
executed_amount=executed_amount,
executed_reserve=executed_reserve,
lp_fees=lp_fee_paid,
native_token_price=0, # won't fix
cost_of_gas=result.get_cost_of_gas(),
)
else:
# Trade failed
report_failure(ts, state, trade, stop_on_execution_failure)
def mock_partial_deployment_for_analysis(
self,
web3: Web3,
router_address: str
) -> UniswapV3Deployment:
return mock_partial_deployment_for_analysis(web3, router_address)
[docs]def get_uniswap_for_pair(web3: Web3, address_map: dict, target_pair: TradingPairIdentifier) -> UniswapV3Deployment:
"""Get Uniswap v3 resolved contracts for a pair."""
assert target_pair.exchange_address, f"Exchange address missing for {target_pair}"
factory_address = Web3.to_checksum_address(target_pair.exchange_address)
assert factory_address == Web3.to_checksum_address(address_map["factory"]), \
"address_map[\"factory\"] and target_pair.exchange_address should be equal\n" \
f"Got {factory_address} and {address_map['factory']} on pair {target_pair}"
cache_key = (factory_address, id(web3))
cached = _uniswap_v3_cache.get(cache_key)
if cached:
return cached
router_address = Web3.to_checksum_address(address_map["router"])
position_manager_address = Web3.to_checksum_address(address_map["position_manager"])
quoter_address = Web3.to_checksum_address(address_map["quoter"])
quoter_v2 = address_map.get("quoter_v2")
router_v2 = address_map.get("router_v2")
try:
cached = fetch_deployment(
web3,
factory_address,
router_address,
position_manager_address,
quoter_address,
quoter_v2=quoter_v2,
router_v2=router_v2,
)
_uniswap_v3_cache[cache_key] = cached
return cached
except ContractLogicError as e:
raise RuntimeError(f"Could not fetch deployment data for router address {router_address} (factory {factory_address}) - data is likely wrong") from e
#: TODO: Web3 instances stay around here forever.
#: But only matters for unit tests.
_uniswap_v3_cache = {}