Source code for tradeexecutor.cli.commands.console

"""console command.

- Open interactive IPython session within the trade-executor

- Can be used as a part of Docker image

To start a console in shell with `docker-compose.yml` set up:

.. code-block:: shell

     docker-compose run $YOUR_CONTAINER_NAME console

"""
import datetime
import itertools
from decimal import Decimal
from pathlib import Path
from typing import Optional

import typer

from IPython import embed
import pandas as pd

from eth_defi.hotwallet import HotWallet
from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.timebucket import TimeBucket
from . import shared_options

from .app import app
from ..bootstrap import prepare_executor_id, prepare_cache, create_web3_config, create_execution_and_sync_model, \
    create_state_store, create_client
from ..log import setup_logging
from ..version_info import VersionInfo
from ...monkeypatch.web3 import construct_sign_and_send_raw_middleware
from ...state.state import State
from ...statistics.in_memory_statistics import refresh_run_state
from ...strategy.approval import UncheckedApprovalModel
from ...strategy.bootstrap import make_factory_from_strategy_mod
from ...strategy.description import StrategyExecutionDescription
from ...strategy.execution_context import ExecutionContext, ExecutionMode
from ...strategy.execution_model import AssetManagementMode
from ...strategy.pandas_trader.indicator import DiskIndicatorStorage, MemoryIndicatorStorage, prepare_indicators, calculate_indicators
from ...strategy.pandas_trader.strategy_input import StrategyInputIndicators
from ...strategy.run_state import RunState
from ...strategy.strategy_module import read_strategy_module
from ...strategy.trading_strategy_universe import TradingStrategyUniverseModel
from ...strategy.universe_model import UniverseOptions
from ...utils.timer import timed_task


[docs]def launch_console(bindings: dict): """Start IPython session. Assume line length of 130. """ print('') print('Following classes and objects are added to the interactive interpreter without import:') for var, val in bindings.items(): str_value = str(val) str_value = str_value[0:100] + "..." if len(str_value) > 60 else str_value line = "{key:25}: {value}".format( key=var, value=str_value.replace('\n', ' ').replace('\r', ' ') ) print(line) print('') embed(user_ns=bindings, colors="Linux")
[docs]@app.command() def console( id: str = typer.Option(None, envvar="EXECUTOR_ID", help="Executor id used when programmatically referring to this instance. If not given, take the base of --strategy-file."), # State state_file: Optional[Path] = shared_options.state_file, strategy_file: Path = shared_options.strategy_file, private_key: str = shared_options.private_key, trading_strategy_api_key: str = shared_options.trading_strategy_api_key, cache_path: Optional[Path] = shared_options.cache_path, # Get minimum gas balance from the env minimum_gas_balance: Optional[float] = typer.Option(0.1, envvar="MINUMUM_GAS_BALANCE", help="What is the minimum balance of gas token you need to have in your wallet. If the balance falls below this, abort by crashing and do not attempt to create transactions. Expressed in the native token e.g. ETH."), # Web3 connection options json_rpc_binance: Optional[str] = shared_options.json_rpc_binance, json_rpc_polygon: Optional[str] = shared_options.json_rpc_polygon, json_rpc_ethereum: Optional[str] = shared_options.json_rpc_ethereum, json_rpc_avalanche: Optional[str] = shared_options.json_rpc_avalanche, json_rpc_arbitrum: Optional[str] = shared_options.json_rpc_arbitrum, json_rpc_anvil: Optional[str] = shared_options.json_rpc_anvil, # Live trading or backtest asset_management_mode: AssetManagementMode = shared_options.asset_management_mode, vault_address: Optional[str] = shared_options.vault_address, vault_adapter_address: Optional[str] = shared_options.vault_adapter_address, vault_payment_forwarder_address: Optional[str] = shared_options.vault_payment_forwarder, log_level: str = shared_options.log_level, unit_testing: bool = shared_options.unit_testing, ): """Open interactive IPython console to explore state. Open an interactive Python prompt where you can inspect and debug the current trade executor state. Strategy, state and execution state are loaded to the memory for debugging. Assumes you have a strategy deployed as a Docker container, environment variabels and such are set up, then you want to diagnose or modify the strategy environment after it has been taken offline. For an example console command you can read the hot wallet balance by: >>> web3.eth.get_balance(hot_wallet.address) / 10**18 """ global logger id = prepare_executor_id(id, strategy_file) logger = setup_logging(log_level) version_info = VersionInfo.read_docker_version() logger.info(f"Docker image version: {version_info.tag}") logger.info(f"Commit hash: {version_info.commit_hash}") logger.info(f"Commit message: {version_info.commit_message}") logger.info("") mod = read_strategy_module(strategy_file) cache_path = prepare_cache(id, cache_path) execution_context = ExecutionContext( mode=ExecutionMode.real_trading, timed_task_context_manager=timed_task, engine_version=mod.trading_strategy_engine_version, ) web3config = create_web3_config( json_rpc_binance=json_rpc_binance, json_rpc_polygon=json_rpc_polygon, json_rpc_avalanche=json_rpc_avalanche, json_rpc_ethereum=json_rpc_ethereum, json_rpc_anvil=json_rpc_anvil, json_rpc_arbitrum=json_rpc_arbitrum, ) assert web3config, "No RPC endpoints given. A working JSON-RPC connection is needed for check-wallet" hot_wallet = HotWallet.from_private_key(private_key) # Check that we are connected to the chain strategy assumes web3config.set_default_chain(mod.get_default_chain_id()) web3config.check_default_chain_id() if hot_wallet: # Add to Python console singing web3config.add_hot_wallet_signing(hot_wallet) run_state = RunState() run_state.version = VersionInfo.read_docker_version() run_state.executor_id = id client, routing_model = create_client( mod=mod, web3config=web3config, trading_strategy_api_key=trading_strategy_api_key, cache_path=cache_path, clear_caches=False, test_evm_uniswap_v2_factory=None, test_evm_uniswap_v2_router=None, test_evm_uniswap_v2_init_code_hash=None, ) assert client is not None, "You need to give details for TradingStrategy.ai client" execution_model, sync_model, valuation_model_factory, pricing_model_factory = create_execution_and_sync_model( asset_management_mode=asset_management_mode, private_key=private_key, web3config=web3config, confirmation_timeout=datetime.timedelta(seconds=5*60), confirmation_block_count=5, max_slippage=0.02, min_gas_balance=0, vault_address=vault_address, vault_adapter_address=vault_adapter_address, vault_payment_forwarder_address=vault_payment_forwarder_address, routing_hint=mod.trade_routing, ) logger.info("Valuation model factory is %s, pricing model factory is %s", valuation_model_factory, pricing_model_factory) # Set up the strategy engine factory = make_factory_from_strategy_mod(mod) run_description: StrategyExecutionDescription = factory( execution_model=execution_model, execution_context=execution_context, timed_task_context_manager=execution_context.timed_task_context_manager, sync_model=sync_model, valuation_model_factory=valuation_model_factory, pricing_model_factory=pricing_model_factory, approval_model=UncheckedApprovalModel(), client=client, routing_model=routing_model, run_state=RunState(), ) run_state.source_code = run_description.source_code # We construct the trading universe to know what's our reserve asset universe_model: TradingStrategyUniverseModel = run_description.universe_model ts = datetime.datetime.utcnow() universe = universe_model.construct_universe( ts, ExecutionMode.preflight_check, UniverseOptions()) # Get all tokens from the universe reserve_assets = universe.reserve_assets web3 = web3config.get_default() logger.info("RPC details") # Check the chain is online logger.info(f" Chain id is {web3.eth.chain_id:,}") logger.info(f" Latest block is {web3.eth.block_number:,}") # Check balances logger.info("Balance details") logger.info(" Hot wallet is %s", hot_wallet.address) gas_balance = web3.eth.get_balance(hot_wallet.address) / 10**18 logger.info(" We have %f tokens for gas left", gas_balance) if not state_file: state_file = f"state/{id}.json" store = create_state_store(Path(state_file)) if store.is_pristine(): state = store.create(id) else: state = store.load() logger.info("State details") logger.info(" Number of positions: %s", len(list(state.portfolio.get_all_positions()))) logger.info(" Number of trades: %s", len(list(state.portfolio.get_all_trades()))) runner = run_description.runner routing_state, pricing_model, valuation_model = runner.setup_routing(universe) # TODO: Make construction of routing model cleaner if routing_model is None: routing_model = runner.routing_model # Expose the previous backtest run to in the console as well backtest_result = Path(f"state/{id}-backtest.json") if backtest_result.exists(): backtested_state = State.read_json_file(backtest_result) else: backtested_state = None refresh_run_state( run_state, state, execution_context, visualisation=True, universe=universe, sync_model=sync_model, backtested_state=backtested_state, ) if mod.create_indicators: # If the strategy uses indicators calculate and expose them to the console # assert mod.parameters, "You need to have Parameters class if create_indicators is specified" indicator_storage = MemoryIndicatorStorage(universe_key=universe.get_cache_key()) indicator_set = prepare_indicators( mod.create_indicators, mod.parameters, universe, execution_context, timestamp=datetime.datetime.utcnow() ) indicators_needed = set(indicator_set.generate_combinations(universe)) indicator_result_map = calculate_indicators( universe, indicator_storage, indicator_set, execution_context=execution_context, remaining=indicators_needed, max_workers=1, # Max_workers=1 to enable easier debug ) indicators = StrategyInputIndicators( strategy_universe=universe, available_indicators=indicator_set, indicator_results=indicator_result_map, timestamp=None, ) else: indicator_storage = indicator_set = indicator_result_map = indicators = None # Set up the default objects # availalbe in the interactive session bindings = { "web3": web3, "client": client, "state": state, "strategy_universe": universe, "store": store, "hot_wallet": hot_wallet, "routing_state": routing_state, "pricing_model": pricing_model, "valuation_model": valuation_model, "routing_model": routing_model, "runner": runner, "sync_model": sync_model, "pd": pd, "cache_path": cache_path.absolute(), "datetime": datetime, "Decimal": Decimal, "ExecutionMode": ExecutionMode, "ChainId": ChainId, "TimeBucket": TimeBucket, "strategy_module": mod, "run_state": run_state, "backtested_state": backtested_state, "indicator_storage": indicator_storage, "indicator_set": indicator_set, "indicator_result_map": indicator_result_map, "indicators": indicators, } # Expose pairs to console as well for pair in itertools.islice(universe.iterate_pairs(), 5): name = pair.get_ticker().lower().replace("-", "_") bindings[name] = pair if not unit_testing: launch_console(bindings)