Source code for tradeexecutor.cli.commands.webapi

"""Web API command

Example::

    poetry run trade-executor webapi --strategy-file=strategies/enzyme-polygon-matic-usdc.py
"""

import datetime
import faulthandler
import logging
import os
import time
from decimal import Decimal 
from pathlib import Path
from queue import Queue
from typing import Optional

import typer
try:
    import waitress
    from ...webhook.server import create_pyramid_app
except ImportError:
    waitress = None

from . import shared_options
from .app import app
from ..bootstrap import prepare_executor_id, prepare_cache, create_metadata, create_state_store, create_web3_config, create_client
from ..log import setup_logging, setup_file_logging
from ..version_info import VersionInfo
from ...strategy.execution_model import AssetManagementMode
from ...strategy.run_state import RunState
from ...strategy.strategy_module import read_strategy_module
from ...statistics.in_memory_statistics import refresh_run_state
from ...strategy.execution_context import ExecutionContext, ExecutionMode
from ...strategy.trading_strategy_universe import DefaultTradingStrategyUniverseModel
from ...strategy.pandas_trader.indicator import calculate_and_load_indicators, MemoryIndicatorStorage, call_create_indicators
from ...strategy.pandas_trader.strategy_input import StrategyInputIndicators
from ...utils.timer import timed_task

logger = logging.getLogger(__name__)


[docs]@app.command() def webapi( # Strategy assets id: str = shared_options.id, name: Optional[str] = shared_options.name, short_description: Optional[str] = typer.Option(None, envvar="SHORT_DESCRIPTION", help="Short description for metadata"), long_description: Optional[str] = typer.Option(None, envvar="LONG_DESCRIPTION", help="Long description for metadata"), icon_url: Optional[str] = typer.Option(None, envvar="ICON_URL", help="Strategy icon for web rendering and Discord avatar"), strategy_file: Path = shared_options.strategy_file, trading_strategy_api_key: str = shared_options.trading_strategy_api_key, # Webhook server options http_port: int = typer.Option(3456, envvar="HTTP_PORT", help="Which HTTP port to listen. The default is 3456, the default port of Pyramid web server."), http_host: str = typer.Option("0.0.0.0", envvar="HTTP_HOST", help="The IP address to bind for the web server. By default listen to all IP addresses available in the run-time environment."), http_username: str = typer.Option(None, envvar="HTTP_USERNAME", help="Username for HTTP Basic Auth protection of webhooks"), http_password: str = typer.Option(None, envvar="HTTP_PASSWORD", help="Password for HTTP Basic Auth protection of webhooks"), # Logging file_log_level: Optional[str] = typer.Option("info", envvar="FILE_LOG_LEVEL", help="Log file log level. The default log file is logs/id.log."), # Logging log_level: str = shared_options.log_level, # Various file configurations state_file: Optional[Path] = shared_options.state_file, cache_path: Optional[Path] = shared_options.cache_path, ): """Launch Trade Executor instance.""" global logger # Guess id from the strategy file id = prepare_executor_id(id, strategy_file) # We always need a name-*- if not name: if strategy_file: name = os.path.basename(strategy_file) else: name = "Unnamed backtest" if not log_level: log_level = logging.INFO # Make sure unit tests run logs do not get polluted # Don't touch any log levels, but # make sure we have logger.trading() available when # log_level is "disabled" logger = setup_logging(log_level, in_memory_buffer=True) setup_file_logging( f"logs/{id}.log", file_log_level, http_logging=True, ) if not state_file: state_file = f"state/{id}.json" cache_path = prepare_cache(id, cache_path, False) mod = read_strategy_module(strategy_file) if state_file: store = create_state_store(Path(state_file)) fees = dict( management_fee=mod.management_fee, trading_strategy_protocol_fee=mod.trading_strategy_protocol_fee, strategy_developer_fee=mod.strategy_developer_fee, ) metadata = create_metadata( name, short_description, long_description, icon_url, AssetManagementMode.dummy, chain_id=mod.get_default_chain_id(), vault=None, fees=fees, ) # Start the queue that relays info from the web server to the strategy executor command_queue = Queue() run_state = RunState() run_state.version = VersionInfo.read_docker_version() run_state.executor_id = id # Create strategy universe and chart_registry to load into the run_state ts = datetime.datetime.utcnow() execution_context = ExecutionContext( mode=ExecutionMode.preflight_check, timed_task_context_manager=timed_task, engine_version=mod.trading_strategy_engine_version, ) web3config = create_web3_config( json_rpc_binance=None, json_rpc_polygon=None, json_rpc_avalanche=None, json_rpc_ethereum=None, json_rpc_base=None, json_rpc_anvil=None, json_rpc_arbitrum=None, unit_testing=False, ) client, routing_model = create_client( mod=mod, web3config=web3config, trading_strategy_api_key=trading_strategy_api_key, cache_path=cache_path, clear_caches=False, asset_management_mode=AssetManagementMode.dummy, test_evm_uniswap_v2_factory=None, test_evm_uniswap_v2_router=None, test_evm_uniswap_v2_init_code_hash=None, ) assert client is not None, "You need to give details for TradingStrategy.ai client" universe_model = DefaultTradingStrategyUniverseModel( client, execution_context, mod.create_trading_universe ) universe = universe_model.construct_universe( ts, ExecutionMode.preflight_check, mod.get_universe_options(), strategy_parameters=mod.parameters, execution_model=None, ) logger.info("Creating chart_registry") run_state.chart_registry = mod.create_charts( timestamp=ts, parameters=mod.parameters, strategy_universe=universe, execution_context=execution_context, ) logger.info("Creating indicators") storage = MemoryIndicatorStorage(universe.get_cache_key()) indicators = call_create_indicators( mod.create_indicators, mod.parameters, universe, execution_context, ts, ) indicator_results = calculate_and_load_indicators( strategy_universe=universe, storage=storage, execution_context=execution_context, indicators=indicators, parameters=mod.parameters, strategy_cycle_timestamp=ts, ) strategy_input_indicators = StrategyInputIndicators( universe, indicator_results=indicator_results, available_indicators=indicators, ) # Pass indicator data to the web chart rendering end point run_state.latest_indicators = strategy_input_indicators # Set up read-only state sync if not store.is_pristine(): run_state.read_only_state_copy = store.load() refresh_run_state( run_state, store.load(), ExecutionContext(mode=ExecutionMode.unit_testing), cycle_duration=mod.parameters.cycle_duration, ) app = create_pyramid_app( http_username, http_password, command_queue, store, metadata, run_state, ) waitress.serve(app, host=http_host, port=http_port)