All-time high on Base beta
Momentum strategy buying tokens breaching their all-time highs
Source code
The source code of the All-time high on Base strategy
"""Base ATH strategy, version 2.0.
Check universe and indicators:
trade-executor \
check-universe \
--strategy-file=strategy/base-ath-v2.py \
--trading-strategy-api-key=$TRADING_STRATEGY_API_KEY
Run backtest:
trade-executor \
backtest \
--strategy-file=strategy/base-ath.py \
--trading-strategy-api-key=$TRADING_STRATEGY_API_KEY
Perform test trade:
docker compose run \
base-ath \
perform-test-trade \
--pair "(base, uniswap-v2, KEYCAT, WETH, 0.003)" \
--simulate \
--amount=5.0
"""
import datetime
import logging
import pandas as pd
import pandas_ta
from eth_defi.token import USDC_NATIVE_TOKEN
from eth_defi.token import WRAPPED_NATIVE_TOKEN
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.state.types import USDollarAmount
from tradeexecutor.strategy.alpha_model import AlphaModel
from tradeexecutor.strategy.cycle import CycleDuration
from tradeexecutor.strategy.default_routing_options import TradeRouting
from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorDependencyResolver
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSource
from tradeexecutor.strategy.pandas_trader.indicator_decorator import IndicatorRegistry
from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInput
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.tag import StrategyTag
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradeexecutor.strategy.trading_strategy_universe import (
load_partial_data)
from tradeexecutor.strategy.tvl_size_risk import USDTVLSizeRiskModel
from tradeexecutor.strategy.universe_model import UniverseOptions
from tradeexecutor.strategy.weighting import weight_equal, weight_passthrouh
from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.pair import PandasPairUniverse
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.transport.cache import OHLCVCandleType
from tradingstrategy.utils.forward_fill import forward_fill
from tradingstrategy.utils.groupeduniverse import resample_candles
from tradingstrategy.utils.liquidity_filter import prefilter_pairs_with_tvl
from tradingstrategy.utils.token_filter import add_base_quote_address_columns
from tradingstrategy.utils.token_filter import filter_for_exchange_slugs
from tradingstrategy.utils.token_filter import filter_pairs_default
from tradingstrategy.utils.token_extra_data import load_token_metadata
from tradingstrategy.utils.token_filter import filter_by_token_sniffer_score
from tradingstrategy.utils.token_filter import deduplicate_pairs_by_volume
from tradingstrategy.lending import LendingProtocolType
from tradeexecutor.state.identifier import TradingPairIdentifier
from tradeexecutor.utils.dedent import dedent_any
logger = logging.getLogger(__name__)
#
# Strategy parametrers
#
trading_strategy_engine_version = "0.5"
class Parameters:
id = "base-ath"
# We trade 1h candle
candle_time_bucket = TimeBucket.h1
cycle_duration = CycleDuration.cycle_2h
# Coingecko categories to include
# s
# See list here: TODO
#
chain_id = ChainId.base
exchanges = {"uniswap-v2", "uniswap-v3"}
#
# Basket construction and rebalance parameters
#
min_asset_universe = 1 # How many assets we need in the asset universe to start running the index
max_assets_in_portfolio = 10 # How many assets our basket can hold once
allocation = 0.95 # Allocate all cash to volatile pairs
# min_rebalance_trade_threshold_pct = 0.05 # % of portfolio composition must change before triggering rebalacne
individual_rebalance_min_threshold_usd = 250.0 # Don't make buys less than this amount
sell_rebalance_min_threshold = 250.0
per_position_cap_of_pool = 0.01 # Never own more than % of the lit liquidity of the trading pool
max_concentration = 0.10 # How large % can one asset be in a portfolio once
min_portfolio_weight = 0.0050 # Close position / do not open if weight is less than 50 BPS
# ATH indicator parameters
ath_delay_bars = 144
ath_window_bars = 360
ath_threshold = 1.10
ath_span = 72
# RSI filter parameters
daily_rsi_bars = 90
daily_rsi_threshold = 25
# For the length of trailing sharpe used in inclusion criteria
rolling_volume_bars = pd.Timedelta("7d") // candle_time_bucket.to_timedelta()
rolling_volatility_bars = pd.Timedelta("7d") // candle_time_bucket.to_timedelta()
tvl_ewm_span = 7 * 24 # Smooth TVL inclusin criteria
min_volume = 25_000 # USD
min_tvl_prefilter = 175_000 # USD - to reduce number of trading pairs for backtest-purposes only
min_tvl = 175_000 # USD - set to same as above if you want to avoid any survivorship bias
min_token_sniffer_score = 20 # 20 = AAVE
#
# Yield on cash
#
use_aave = True
credit_flow_dust_threshold = 5.0 # Min deposit USD to Aave
#
#
# Backtesting only
# Limiting factor: Aave v3 on Base starts at the end of DEC 2023
#
backtest_start = datetime.datetime(2024, 1, 1)
backtest_end = datetime.datetime(2025, 3, 12)
initial_cash = 100_000
#
# Live only
#
routing = TradeRouting.default
required_history_period = datetime.timedelta(days=daily_rsi_bars + 2)
slippage_tolerance = 0.0060 # 0.6%
assummed_liquidity_when_data_missings = 10_000
#: Assets used in routing and buy-and-hold benchmark values for our strategy, but not traded by this strategy.
SUPPORTING_PAIRS = [
(ChainId.base, "uniswap-v2", "WETH", "USDC", 0.0030),
(ChainId.base, "uniswap-v3", "WETH", "USDC", 0.0005),
(ChainId.base, "uniswap-v3", "cbBTC", "WETH", 0.0030), # Only trading since October
]
#: Needed for USDC credit
LENDING_RESERVES = [
(Parameters.chain_id, LendingProtocolType.aave_v3, "USDC"),
]
PREFERRED_STABLECOIN = USDC_NATIVE_TOKEN[Parameters.chain_id.value].lower()
VOL_PAIR = (ChainId.base, "uniswap-v3", "WETH", "USDC", 0.0005)
def create_trading_universe(
timestamp: datetime.datetime,
client: Client,
execution_context: ExecutionContext,
universe_options: UniverseOptions,
) -> TradingStrategyUniverse:
"""Create the trading universe.
- Load Trading Strategy full pairs dataset
- Load built-in Coingecko top 1000 dataset
- Get all DEX tokens for a certain Coigecko category
- Load OHCLV data for these pairs
- Load also BTC and ETH price data to be used as a benchmark
"""
if execution_context.live_trading:
# Live trading, send strategy universe formation details to logs
debug_printer = logger.info
else:
# Notebook node
debug_printer = print
chain_id = Parameters.chain_id
debug_printer(f"Preparing trading universe on chain {chain_id.get_name()}")
exchange_universe = client.fetch_exchange_universe()
targeted_exchanges = [exchange_universe.get_by_chain_and_slug(chain_id, slug) for slug in Parameters.exchanges]
# Pull out our benchmark pairs ids.
# We need to construct pair universe object for the symbolic lookup.
# TODO: PandasPairUniverse(buidl_index=True) - speed this up by skipping index building
all_pairs_df = client.fetch_pair_universe().to_pandas()
all_pairs_df = filter_for_exchange_slugs(all_pairs_df, Parameters.exchanges)
debug_printer("Creating universe for benchmark pair extraction")
pair_universe = PandasPairUniverse(
all_pairs_df,
exchange_universe=exchange_universe,
build_index=False,
)
debug_printer(f"Exchanges {Parameters.exchanges} have total {len(all_pairs_df):,} pairs on chain {Parameters.chain_id.get_name()}")
# Get TVL data for prefilteirng
if execution_context.live_trading:
# For live trading, we take TVL data from ~around the start of the strategy until today
tvl_time_bucket = TimeBucket.d1
start = datetime.datetime(2024, 2, 1)
end = tvl_time_bucket.floor(pd.Timestamp(datetime.datetime.utcnow() - tvl_time_bucket.to_timedelta()))
else:
start = Parameters.backtest_start
end = Parameters.backtest_end
#
# Do exchange and TVL prefilter pass for the trading universe
#
min_tvl = Parameters.min_tvl_prefilter
# logging.getLogger().setLevel(logging.INFO)
liquidity_time_bucket = TimeBucket.d1
tvl_df = client.fetch_tvl(
mode="min_tvl_low",
bucket=liquidity_time_bucket,
start_time=start,
end_time=end,
exchange_ids=[exc.exchange_id for exc in targeted_exchanges],
min_tvl=min_tvl,
)
# logging.getLogger().setLevel(logging.WARNING)
debug_printer(f"Fetch TVL, we got {len(tvl_df['pair_id'].unique())} pairs with TVL data for min TVL criteria {min_tvl}")
tvl_filtered_pair_ids = tvl_df["pair_id"].unique()
benchmark_pair_ids = [pair_universe.get_pair_by_human_description(desc).pair_id for desc in SUPPORTING_PAIRS]
needed_pair_ids = set(benchmark_pair_ids) | set(tvl_filtered_pair_ids)
pairs_df = all_pairs_df[all_pairs_df["pair_id"].isin(needed_pair_ids)]
debug_printer(f"After TVL prefilter to {Parameters.min_tvl_prefilter:,} in {Parameters.backtest_start} - {Parameters.backtest_end}, we have {len(pairs_df)} trading pairs")
pairs_df = add_base_quote_address_columns(pairs_df)
# Never deduplicate supporting pars
supporting_pairs_df = pairs_df[pairs_df["pair_id"].isin(benchmark_pair_ids)]
allowed_quotes = {
PREFERRED_STABLECOIN,
WRAPPED_NATIVE_TOKEN[chain_id.value].lower(),
}
filtered_pairs_df = filter_pairs_default(
pairs_df,
good_quote_token_addresses=allowed_quotes,
verbose_print=print,
)
# Deduplicate trading pairs - Choose the best pair with the best volume
deduplicated_df = deduplicate_pairs_by_volume(filtered_pairs_df)
# Get our reference pairs back to the dataset
pairs_df = pd.concat([deduplicated_df, supporting_pairs_df]).drop_duplicates(subset='pair_id', keep='first')
debug_printer(f"After deduplication we have {len(pairs_df)} pairs")
# Add benchmark pairs back to the dataset
pairs_df = pd.concat([pairs_df, supporting_pairs_df]).drop_duplicates(subset='pair_id', keep='first')
# Load metadata
debug_printer("Loading metadata")
# logging.getLogger().setLevel(logging.INFO)
pairs_df = load_token_metadata(pairs_df, client, printer=debug_printer)
# logging.getLogger().setLevel(logging.WARNING)
# Scam filter using TokenSniffer
risk_filtered_pairs_df = filter_by_token_sniffer_score(
pairs_df,
risk_score=Parameters.min_token_sniffer_score,
printer=debug_printer,
)
# Check if we accidentally get rid of benchmark pairs we need for the strategy
difference = set(benchmark_pair_ids).difference(set(risk_filtered_pairs_df["pair_id"]))
if difference:
first_dropped_id = next(iter(difference))
first_dropped_data = pairs_df.loc[pairs_df.pair_id == first_dropped_id]
assert len(first_dropped_data) == 1, f"Got {len(first_dropped_data)} entries: {first_dropped_data}"
raise AssertionError(f"Benchmark trading pair dropped in filter_by_token_sniffer_score() check: {first_dropped_data.iloc[0]}")
pairs_df = risk_filtered_pairs_df.sort_values("volume", ascending=False)
debug_printer(f"After TokenSniffer risk filter we have {len(pairs_df)} pairs")
# Remove extra pairs from the TVL data,
# so we do not drag extra data and memory usage to the trading universe and liquidity candles
tvl_pair_count_no_filtering = tvl_df["pair_id"].nunique()
tvl_df = tvl_df[tvl_df["pair_id"].isin(pairs_df["pair_id"])]
tvl_filtering_count = tvl_df["pair_id"].nunique()
debug_printer(f"TVL data before risk filtering had {tvl_pair_count_no_filtering} pairs, after filtering we have {tvl_filtering_count} pairs")
uni_v2 = pairs_df.loc[pairs_df["exchange_slug"] == "uniswap-v2"]
uni_v3 = pairs_df.loc[pairs_df["exchange_slug"] == "uniswap-v3"]
other_dex = pairs_df.loc[~((pairs_df["exchange_slug"] != "uniswap-v3") | (pairs_df["exchange_slug"] != "uniswap-v2"))]
debug_printer(f"Pairs on Uniswap v2: {len(uni_v2)}, Uniswap v3: {len(uni_v3)}, other DEX: {len(other_dex)}")
dataset = load_partial_data(
client=client,
time_bucket=Parameters.candle_time_bucket,
pairs=pairs_df,
execution_context=execution_context,
universe_options=universe_options,
liquidity_time_bucket=liquidity_time_bucket,
preloaded_tvl_df=tvl_df,
lending_reserves=LENDING_RESERVES,
)
reserve_asset = PREFERRED_STABLECOIN
debug_printer("Creating trading universe")
strategy_universe = TradingStrategyUniverse.create_from_dataset(
dataset,
reserve_asset=reserve_asset,
forward_fill=True, # We got very gappy data from low liquid DEX coins
forward_fill_until=timestamp,
)
# Tag benchmark/routing pairs tokens so they can be separated from the rest of the tokens
# for the index construction.
strategy_universe.warm_up_data()
for pair_id in benchmark_pair_ids:
pair = strategy_universe.get_pair_by_id(pair_id)
pair.other_data["benchmark"] = False
debug_printer(f"Total {strategy_universe.get_pair_count()}")
return strategy_universe
#
# Strategy logic
#
def decide_trades(
input: StrategyInput
) -> list[TradeExecution]:
"""For each strategy tick, generate the list of trades."""
parameters = input.parameters
position_manager = input.get_position_manager()
state = input.state
timestamp = input.timestamp
indicators = input.indicators
strategy_universe = input.strategy_universe
# Build signals for each pair
alpha_model = AlphaModel(
timestamp,
close_position_weight_epsilon=parameters.min_portfolio_weight, # 10 BPS is our min portfolio weight
)
vol_pair = strategy_universe.get_pair_by_human_description(VOL_PAIR)
volume_included_pair_count = indicators.get_indicator_value(
"volume_included_pair_count",
)
tvl_included_pair_count = indicators.get_indicator_value(
"tvl_included_pair_count",
)
# Get pairs included in this rebalance cycle.
# This includes pair that have been pre-cleared in inclusion_criteria()
# with volume, volatility and TVL filters
included_pairs = indicators.get_indicator_value(
"inclusion_criteria",
na_conversion=False,
)
if included_pairs is None:
included_pairs = []
# Set signal for each pair
signal_count = 0
for pair_id in included_pairs:
pair = strategy_universe.get_pair_by_id(pair_id)
pair_signal = indicators.get_indicator_value("signal", pair=pair)
if pair_signal is None:
continue
weight = pair_signal
if weight < 0:
continue
alpha_model.set_signal(
pair,
weight,
)
# Diagnostics reporting
signal_count += 1
# Calculate how much dollar value we want each individual position to be on this strategy cycle,
# based on our total available equity
portfolio = position_manager.get_current_portfolio()
portfolio_target_value = portfolio.get_total_equity() * parameters.allocation
# Select max_assets_in_portfolio assets in which we are going to invest
# Calculate a weight for ecah asset in the portfolio using 1/N method based on the raw signal
alpha_model.select_top_signals(count=parameters.max_assets_in_portfolio)
alpha_model.assign_weights(method=weight_passthrouh)
# alpha_model.assign_weights(method=weight_by_1_slash_n)
#
# Normalise weights and cap the positions
#
size_risk_model = USDTVLSizeRiskModel(
pricing_model=input.pricing_model,
per_position_cap=parameters.per_position_cap_of_pool, # This is how much % by all pool TVL we can allocate for a position
missing_tvl_placeholder_usd=parameters.assummed_liquidity_when_data_missings, # Placeholder for missing TVL data until we get the data off the chain
)
alpha_model.normalise_weights(
investable_equity=portfolio_target_value,
size_risk_model=size_risk_model,
max_weight=parameters.max_concentration,
)
# Load in old weight for each trading pair signal,
# so we can calculate the adjustment trade size
alpha_model.update_old_weights(
state.portfolio,
ignore_credit=True,
)
alpha_model.calculate_target_positions(position_manager)
# Shift portfolio from current positions to target positions
# determined by the alpha signals (momentum)
# rebalance_threshold_usd = portfolio_target_value * parameters.min_rebalance_trade_threshold_pct
rebalance_threshold_usd = parameters.individual_rebalance_min_threshold_usd
assert rebalance_threshold_usd > 0.1, "Safety check tripped - something like wrong with strat code"
trades = alpha_model.generate_rebalance_trades_and_triggers(
position_manager,
min_trade_threshold=rebalance_threshold_usd, # Don't bother with trades under XXXX USD
invidiual_rebalance_min_threshold=parameters.individual_rebalance_min_threshold_usd,
sell_rebalance_min_threshold=parameters.sell_rebalance_min_threshold,
execution_context=input.execution_context,
)
# Supply or withdraw cash to Aave if strategy is set to do so
credit_trades = []
if parameters.use_aave:
credit_deposit_flow = position_manager.calculate_credit_flow_needed(
trades,
parameters.allocation,
)
if abs(credit_deposit_flow) > parameters.credit_flow_dust_threshold:
credit_trades = position_manager.manage_credit_flow(credit_deposit_flow)
trades += credit_trades
else:
credit_deposit_flow = 0
# Add verbal report about decision made/not made,
# so it is much easier to diagnose live trade execution.
# This will be readable in Discord/Telegram logging.
if input.is_visualisation_enabled():
try:
top_signal = next(iter(alpha_model.get_signals_sorted_by_weight()))
if top_signal.normalised_weight == 0:
top_signal = None
except StopIteration:
top_signal = None
rebalance_volume = sum(t.get_value() for t in trades)
report = dedent_any(f"""
Cycle: #{input.cycle}
Rebalanced: {'👍' if alpha_model.is_rebalance_triggered() else '👎'}
Open/about to open positions: {len(state.portfolio.open_positions)}
Max position value change: {alpha_model.max_position_adjust_usd:,.2f} USD
Rebalance threshold: {alpha_model.position_adjust_threshold_usd:,.2f} USD
Trades decided: {len(trades)}
Pairs total: {strategy_universe.data_universe.pairs.get_count()}
Pairs meeting inclusion criteria: {len(included_pairs)}
Pairs meeting volume inclusion criteria: {volume_included_pair_count}
Pairs meeting TVL inclusion criteria: {tvl_included_pair_count}
Signals created: {signal_count}
Total equity: {portfolio.get_total_equity():,.2f} USD
Cash: {position_manager.get_current_cash():,.2f} USD
Investable equity: {alpha_model.investable_equity:,.2f} USD
Accepted investable equity: {alpha_model.accepted_investable_equity:,.2f} USD
Allocated to signals: {alpha_model.get_allocated_value():,.2f} USD
Discarted allocation because of lack of lit liquidity: {alpha_model.size_risk_discarded_value:,.2f} USD
Credit deposit flow: {credit_deposit_flow:,.2f} USD
Credit trades: {credit_trades}
Rebalance volume: {rebalance_volume:,.2f} USD
""")
# Most volatility pair signal weight (normalised): {max_vol_signal.normalised_weight * 100 if max_vol_signal else '-'} % (got {max_vol_signal.position_size_risk.get_relative_capped_amount() * 100 if max_vol_signal else '-'} % of asked size)
if top_signal:
assert top_signal.position_size_risk
report += dedent_any(f"""
Top signal pair: {top_signal.pair.get_ticker()}
Top signal value: {top_signal.signal}
Top signal weight: {top_signal.raw_weight}
Top signal weight (normalised): {top_signal.normalised_weight * 100:.2f} % (got {top_signal.position_size_risk.get_relative_capped_amount() * 100:.2f} % of asked size)
""")
for flag, count in alpha_model.get_flag_diagnostics_data().items():
report += f"Signals with flag {flag.name}: {count}\n"
state.visualisation.add_message(
timestamp,
report,
)
state.visualisation.set_discardable_data("alpha_model", alpha_model)
return trades # Return the list of trades we made in this cycle
#
# Indicators
#
indicators = IndicatorRegistry()
empty_series = pd.Series([], index=pd.DatetimeIndex([]))
@indicators.define()
def ath(
close: pd.Series,
ath_delay_bars: int,
ath_window_bars: int,
) -> pd.Series:
"""All time high indicator.
- Calculate % we are above all time high
:param ath_window_bars:
We look history for this many bars to find ATH
:param ath_delay_bars:
We skip the this most recent entries for ATH.
E.g. ATH in a previous bar is ignored.
:return:
% we are above lagged all time high.
Value 1.1 means we are 10% above of the previous ATH.
"""
shifted = close.shift(ath_delay_bars)
windowed = shifted.rolling(
window=ath_window_bars,
min_periods=ath_window_bars,
).max()
series = (close / windowed)
return series
@indicators.define()
def volatility(close: pd.Series, rolling_volatility_bars: int) -> pd.Series:
"""Calculate the rolling volatility for rebalancing the index for each decision cycle."""
price_diff = close.pct_change()
rolling_std = price_diff.rolling(window=rolling_volatility_bars).std()
return rolling_std
@indicators.define()
def rolling_cumulative_volume(volume: pd.Series, rolling_volume_bars: int) -> pd.Series:
"""Calculate rolling volume of the pair.
- Used in inclusion criteria
"""
rolling_volume = volume.rolling(window=rolling_volume_bars).sum()
return rolling_volume
@indicators.define()
def rolling_liquidity_avg(close: pd.Series, rolling_volume_bars: int) -> pd.Series:
"""Calculate rolling liquidity average
- This is either TVL or XY liquidity (one sided) depending on the trading pair DEX type
- Used in inclusion criteria
"""
rolling_liquidity_close = close.rolling(window=rolling_volume_bars).mean()
return rolling_liquidity_close
@indicators.define(dependencies=(rolling_cumulative_volume,), source=IndicatorSource.strategy_universe)
def volume_inclusion_criteria(
strategy_universe: TradingStrategyUniverse,
min_volume: USDollarAmount,
rolling_volume_bars: int,
dependency_resolver: IndicatorDependencyResolver,
) -> pd.Series:
"""Calculate pair volume inclusion criteria.
- Avoid including illiquid / broken pairs in the set: Pair is included when it has enough volume
TODO: Add liquidity check later
:return:
Series where each timestamp is a list of pair ids meeting the criteria at that timestamp
"""
series = dependency_resolver.get_indicator_data_pairs_combined(
rolling_cumulative_volume,
parameters={"rolling_volume_bars": rolling_volume_bars},
)
# Get mask for days when the rolling volume meets out criteria
mask = series >= min_volume
mask_true_values_only = mask[mask == True]
# Turn to a series of lists
series = mask_true_values_only.groupby(level='timestamp').apply(lambda x: x.index.get_level_values('pair_id').tolist())
return series
@indicators.define(source=IndicatorSource.tvl)
def tvl(
close: pd.Series,
execution_context: ExecutionContext,
timestamp: pd.Timestamp,
) -> pd.Series:
"""Get TVL series for a pair.
- Because TVL data is 1d and we use 1h everywhere else, we need to forward fill
- Use previous hourly close as the value
"""
if execution_context.live_trading:
# TVL is daily data.
# We need to forward fill until the current hour.
# Use our special ff function.
assert isinstance(timestamp, pd.Timestamp), f"Live trading needs forward-fill end time, we got {timestamp}"
df = pd.DataFrame({"close": close})
df_ff = forward_fill(
df,
Parameters.candle_time_bucket.to_frequency(),
columns=("close",),
forward_fill_until=timestamp,
)
series = df_ff["close"]
return series
else:
return close.resample("1h").ffill()
@indicators.define(dependencies=(tvl,), source=IndicatorSource.dependencies_only_per_pair)
def tvl_ewm(
pair: TradingPairIdentifier,
tvl_ewm_span: float,
dependency_resolver: IndicatorDependencyResolver,
) -> pd.Series:
"""Get smoothed TVL series for a pair.
- Interpretation: If you set span=5, for example, the ewm function will compute an exponential moving average where the weight of the most recent observation is about 33.3% (since α=2/(5+1)≈0.333) and this weight decreases exponentially for older observations.
- We forward fill gaps, so there is no missing data in decide_trades()
- Currently unused in the strategy itself
"""
tvl_ff = dependency_resolver.get_indicator_data(
tvl,
pair=pair,
)
return tvl_ff.ewm(span=tvl_ewm_span).mean()
@indicators.define(dependencies=(tvl_ewm, tvl), source=IndicatorSource.dependencies_only_universe)
def tvl_inclusion_criteria(
min_tvl: USDollarAmount,
dependency_resolver: IndicatorDependencyResolver,
) -> pd.Series:
"""The pair must have min XX,XXX USD one-sided TVL to be included.
- If the Uniswap pool does not have enough ETH or USDC deposited, skip the pair as a scam
:return:
Series where each timestamp is a list of pair ids meeting the criteria at that timestamp
"""
series = dependency_resolver.get_indicator_data_pairs_combined(tvl)
mask = series >= min_tvl
# Turn to a series of lists
mask_true_values_only = mask[mask == True]
series = mask_true_values_only.groupby(level='timestamp').apply(lambda x: x.index.get_level_values('pair_id').tolist())
return series
@indicators.define(
source=IndicatorSource.strategy_universe
)
def trading_availability_criteria(
strategy_universe: TradingStrategyUniverse,
) -> pd.Series:
"""Is pair tradeable at each hour.
- The pair has a price candle at that
- Mitigates very corner case issues that TVL/liquidity data is per-day whileas price data is natively per 1h
and the strategy inclusion criteria may include pair too early hour based on TVL only,
leading to a failed attempt to rebalance in a backtest
- Only relevant for backtesting issues if we make an unlucky trade on the starting date
of trading pair listing
:return:
Series with with index (timestamp) and values (list of pair ids trading at that hour)
"""
# Trading pair availability is defined if there is a open candle in the index for it.
# Because candle data is forward filled, we should not have any gaps in the index.
candle_series = strategy_universe.data_universe.candles.df["open"]
pairs_per_timestamp = candle_series.groupby(level='timestamp').apply(lambda x: x.index.get_level_values('pair_id').tolist())
return pairs_per_timestamp
@indicators.define(
dependencies=[
volume_inclusion_criteria,
tvl_inclusion_criteria,
trading_availability_criteria
],
source=IndicatorSource.strategy_universe
)
def inclusion_criteria(
strategy_universe: TradingStrategyUniverse,
min_volume: USDollarAmount,
rolling_volume_bars: int,
min_tvl: USDollarAmount,
dependency_resolver: IndicatorDependencyResolver
) -> pd.Series:
"""Pairs meeting all of our inclusion criteria.
- Give the tradeable pair set for each timestamp
:return:
Series where index is timestamp and each cell is a list of pair ids matching our inclusion criteria at that moment
"""
# Filter out benchmark pairs like WETH in the tradeable pair set
benchmark_pair_ids = set(strategy_universe.get_pair_by_human_description(desc).internal_id for desc in SUPPORTING_PAIRS)
volume_series = dependency_resolver.get_indicator_data(
volume_inclusion_criteria,
parameters={
"min_volume": min_volume,
"rolling_volume_bars": rolling_volume_bars,
},
)
tvl_series = dependency_resolver.get_indicator_data(
tvl_inclusion_criteria,
parameters={
"min_tvl": min_tvl,
},
)
trading_availability_series = dependency_resolver.get_indicator_data(trading_availability_criteria)
#
# Process all pair ids as a set and the final inclusion
# criteria is union of all sub-criterias
#
df = pd.DataFrame({
"tvl_pair_ids": tvl_series,
"volume_pair_ids": volume_series,
"trading_availability_pair_ids": trading_availability_series,
})
# https://stackoverflow.com/questions/33199193/how-to-fill-dataframe-nan-values-with-empty-list-in-pandas
df = df.fillna("").apply(list)
def _combine_criteria(row):
final_set = set(row["volume_pair_ids"]) & \
set(row["tvl_pair_ids"]) & \
set(row["trading_availability_pair_ids"])
return final_set - benchmark_pair_ids
union_criteria = df.apply(_combine_criteria, axis=1)
# Inclusion criteria data can be spotty at the beginning when there is only 0 or 1 pairs trading,
# so we need to fill gaps to 0
full_index = pd.date_range(
start=union_criteria.index.min(),
end=union_criteria.index.max(),
freq=Parameters.candle_time_bucket.to_frequency(),
)
reindexed = union_criteria.reindex(full_index, fill_value=[])
return reindexed
@indicators.define(dependencies=(volume_inclusion_criteria,), source=IndicatorSource.dependencies_only_universe)
def volume_included_pair_count(
min_volume: USDollarAmount,
rolling_volume_bars: int,
dependency_resolver: IndicatorDependencyResolver
) -> pd.Series:
series = dependency_resolver.get_indicator_data(
volume_inclusion_criteria,
parameters={"min_volume": min_volume, "rolling_volume_bars": rolling_volume_bars},
)
return series.apply(len)
@indicators.define(dependencies=(tvl_inclusion_criteria,), source=IndicatorSource.dependencies_only_universe)
def tvl_included_pair_count(
min_tvl: USDollarAmount,
dependency_resolver: IndicatorDependencyResolver
) -> pd.Series:
"""Calculate number of pairs in meeting volatility criteria on each timestamp"""
series = dependency_resolver.get_indicator_data(
tvl_inclusion_criteria,
parameters={"min_tvl": min_tvl},
)
series = series.apply(len)
# TVL data can be spotty at the beginning when there is only 0 or 1 pairs trading,
# so we need to fill gaps to 0
full_index = pd.date_range(
start=series.index.min(),
end=series.index.max(),
freq=Parameters.candle_time_bucket.to_frequency(),
)
# Reindex and fill NaN with zeros
reindexed = series.reindex(full_index, fill_value=0)
return reindexed
@indicators.define(dependencies=(inclusion_criteria,), source=IndicatorSource.dependencies_only_universe)
def all_criteria_included_pair_count(
min_volume: USDollarAmount,
min_tvl: USDollarAmount,
rolling_volume_bars: int,
dependency_resolver: IndicatorDependencyResolver
) -> pd.Series:
"""Series where each timestamp is the list of pairs meeting all inclusion criteria.
:return:
Series with pair count for each timestamp
"""
series = dependency_resolver.get_indicator_data(
"inclusion_criteria",
parameters={
"min_volume": min_volume,
"min_tvl": min_tvl,
"rolling_volume_bars": rolling_volume_bars,
},
)
return series.apply(len)
@indicators.define(source=IndicatorSource.strategy_universe)
def trading_pair_count(
strategy_universe: TradingStrategyUniverse,
) -> pd.Series:
"""Get number of pairs that trade at each timestamp.
- Pair must have had at least one candle before the timestamp to be included
- Exclude benchmarks pairs we do not trade
:return:
Series with pair count for each timestamp
"""
benchmark_pair_ids = {strategy_universe.get_pair_by_human_description(desc).internal_id for desc in SUPPORTING_PAIRS}
# Get pair_id, timestamp -> timestamp, pair_id index
series = strategy_universe.data_universe.candles.df["open"]
swap_index = series.index.swaplevel(0, 1)
seen_pairs = set()
seen_data = {}
for timestamp, pair_id in swap_index:
if pair_id in benchmark_pair_ids:
continue
seen_pairs.add(pair_id)
seen_data[timestamp] = len(seen_pairs)
series = pd.Series(seen_data.values(), index=list(seen_data.keys()))
return series
@indicators.define(dependencies=(volatility,), source=IndicatorSource.strategy_universe)
def avg_volatility(
strategy_universe,
rolling_volatility_bars: int,
dependency_resolver: IndicatorDependencyResolver
) -> pd.Series:
"""Calculate index avg volatility across all trading pairs.
:return:
Series with pair count for each timestamp
"""
volatility = dependency_resolver.get_indicator_data_pairs_combined(
"volatility",
parameters={"rolling_volatility_bars": rolling_volatility_bars},
)
n_std = 3
def remove_outliers_group(group):
mean = group.mean()
std = group.std()
lower_bound = mean - n_std * std
upper_bound = mean + n_std * std
return group[(group >= lower_bound) & (group <= upper_bound)]
cleaned = volatility.groupby(level='timestamp').apply(remove_outliers_group)
# Group by timestamp, remove outliers within each group, then calculate mean
cleaned_volatility = cleaned.groupby(level=0).mean()
return cleaned_volatility
@indicators.define(source=IndicatorSource.ohlcv)
def daily_price(open, high, low, close, execution_context) -> pd.DataFrame:
"""Resample finer granularity price feed to daily for ADX filtering."""
original_df = pd.DataFrame({
"open": open,
"high": high,
"low": low,
"close": close,
})
daily_df = resample_candles(original_df, pd.Timedelta(days=1))
return daily_df
@indicators.define(source=IndicatorSource.dependencies_only_per_pair, dependencies=[daily_price])
def daily_rsi(
daily_rsi_bars,
pair: TradingPairIdentifier,
dependency_resolver: IndicatorDependencyResolver,
):
daily_close = dependency_resolver.get_indicator_data(
"daily_price",
pair=pair,
column="close",
)
rsi_series = pandas_ta.rsi(
daily_close,
length=daily_rsi_bars,
)
if rsi_series is None:
return empty_series
return rsi_series
@indicators.define(dependencies=[ath, ath, daily_rsi])
def signal(
close: pd.Series,
ath_delay_bars: int,
ath_window_bars: int,
ath_threshold: float,
ath_span: int,
daily_rsi_bars: int,
daily_rsi_threshold: float,
pair: TradingPairIdentifier,
dependency_resolver: IndicatorDependencyResolver
) -> pd.Series:
"""Combine ATH signal with daily RSI filter.
- We use daily RSI filter to get rid of crap pairs like FTC
"""
rsi = dependency_resolver.get_indicator_data(
"daily_rsi",
parameters={"daily_rsi_bars": daily_rsi_bars},
pair=pair,
)
# Set default RSI value so we include pairs
# without enough history
rsi = rsi.fillna(50).infer_objects(copy=False)
ath = dependency_resolver.get_indicator_data(
"ath",
parameters={
"ath_delay_bars": ath_delay_bars,
"ath_window_bars": ath_window_bars,
},
pair=pair,
)
ath_core = ath.ewm(span=ath_span).mean() - ath_threshold
# Use daily RSI as a thrash filter
# See FTC: https://tradingstrategy.ai/trading-view/binance/pancakeswap-v2/ftc-usdt
df = pd.DataFrame({
"rsi": rsi,
"ath_core": ath_core,
})
# forward-fill from daily to hourly
# SHOULD handle forward fill until the live timestamp
df["rsi"] = df["rsi"].infer_objects(copy=False).ffill()
mask = df["rsi"] >= daily_rsi_threshold
df['ath_thresholded'] = 0.0 # Initialize with zeros
df.loc[mask, 'ath_thresholded'] = df.loc[mask, 'ath_core'] # Copy ATH indicator values from non-masked timestamps
return df["ath_thresholded"]
@indicators.define(dependencies=(signal,), source=IndicatorSource.dependencies_only_universe)
def avg_signal(
ath_delay_bars: int,
ath_window_bars: int,
ath_threshold: float,
ath_span: int,
daily_rsi_bars: int,
daily_rsi_threshold: int,
dependency_resolver: IndicatorDependencyResolver
) -> pd.Series:
"""Calculate our "signal" across all pairs.
- Use median - mean is skewed by outliers
:return:
Series with pair count for each timestamp
"""
signal = dependency_resolver.get_indicator_data_pairs_combined(
"signal",
parameters={
"ath_window_bars": ath_window_bars,
"ath_delay_bars": ath_delay_bars,
"ath_threshold": ath_threshold,
"ath_span": ath_span,
"daily_rsi_bars": daily_rsi_bars,
"daily_rsi_threshold": daily_rsi_threshold,
},
)
n_std = 3
def remove_outliers_group(group):
mean = group.mean()
std = group.std()
lower_bound = mean - n_std * std
upper_bound = mean + n_std * std
return group[(group >= lower_bound) & (group <= upper_bound)]
cleaned = signal.groupby(level='timestamp').apply(remove_outliers_group)
# Group by timestamp, remove outliers within each group, then calculate mean
cleaned_avg_signal = cleaned.groupby(level=0).median()
return cleaned_avg_signal
def create_indicators(
timestamp: datetime.datetime,
parameters: StrategyParameters,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext
):
return indicators.create_indicators(
timestamp=timestamp,
parameters=parameters,
strategy_universe=strategy_universe,
execution_context=execution_context,
)
#
# Strategy metadata/UI data.
#
tags = {StrategyTag.beta, StrategyTag.live}
name = "All-time high on Base"
short_description = "Momentum strategy buying tokens breaching their all-time highs"
icon = ""
long_description = """
# Strategy description
This is a [portfolio construction](https://tradingstrategy.ai/glossary/portfolio-construction) and
[momentum](https://tradingstrategy.ai/glossary/momentum) strategy trading tokens on the Base blockchain.
[More clear and accessible explanation of this strategy can be found in this blog post](https://tradingstrategy.ai/blog/cryptocurrency-trading-strategy-buying-all-time-high-tokens).
## Benefits
- Get directional exposure to exciting small-cap onchain token markets
- No need to pick tokens yourself; the strategy will buy all of them
- Small cap memecoins are often uncorrelated with the rest of cryptocurrency markets
## Trading rules summary
Trading rules are designed so that the strategy captures the upward momentum of the markets while trying to stay away from the markets and minimise [drawdowns](https://tradingstrategy.ai/glossary/maximum-drawdown).
- Rebalance every four hours
- Buy tokens that are above their local all-time highs
- Dynamically weight between opened positions
- Trade on Uniswap v2 and v3 on Base
- Have minimum TVL and volume criteria to be included in the trading universe
- Use TokenSniffer as a token quality source to filter scams
- Adjust position size based on lit Uniswap liquidity ([TVL](https://tradingstrategy.ai/glossary/total-value-locked-tvl)), to prevent taking oversized positions on limited liquidity tokens
- Earn interest for uninvested for cash by depositing it in [Aave lending protocol](https://tradingstrategy.ai/glossary/aave)
The strategy is fully open: the source code is available. For further details, see the strategy source code.
## Expected performance
The strategy reflects the performance of small-cap onchain DEX markets, with limited downside.
See [backtesting](./backtesting) page for historical estimated performance.
Past performance is no guarantee of future results.
Due to the short history of recently set-up traded onchain markets, there is no backtesting data about market downturns. To check for overfitting, the same trading rules were tested across different markets with longer histories, with varying but not significantly degraded performance.
## Risks
**This strategy is ultra-high-risk**. Only allocate small amounts of capital you may lose.
Before investing large amounts of capital, speak with the strategy developers.
This strategy, paired with onchain execution, is the first in the world. Because of the novelty, multiple risks are involved. Consider it beta-quality software.
**Trading risk**: Small-cap tokens traded are highly volatile,
may include rug pulls and other questionable activities.
Despite the strategy of automated filtering for tokens, some of the tokens may go to zero. Onchain markets may and many traded tokens go to zero, and are likely to do so at some point.
There is little liquidity for these tokens, and trading costs are high, so they may change abruptly.
**Technical risk**: The strategy relies on ERC-7540: Asynchronous ERC-4626 Tokenized Vaults standard.
This standard is new and not yet time-proven. The onchain trade execution contains unaudited smart contracts. There are centralised price oracles. The redemption process relies on a centralised valuation committee.
The strategy includes risk mitigations like:
- Use third-party security services to check the reputation of traded tokens
- Strategy limits position sizes based on the available lit liquidity
- Strategy limits position sizes based on the overall maximum % of the portfolio
- DAO and/or security committee holding multisignature keys can take manual actions in the case of unexpected issues:
The strategy can be halted and manually wind up
The technical risks will be mitigated in the future by working with other protocols,
by increasing the quality of onchain trading venues, data and decentralisation.
## Further information
- Any questions are welcome in [the Discord community chat](https://tradingstrategy.ai/community)
"""