Base Sentimeme AI Trading Strategy beta
Social-data driven memecoin trading strategy on Base
Source code
The source code of the Base Sentimeme AI Trading Strategy strategy
"""Memecoin strategy based on market sentiment. Long only.
To backtest this strategy module locally:
.. code-block:: console
source scripts/set-latest-tag-gcp.sh
docker-compose run ethereum-memecoin-swing backtest
Or:
.. code-block:: console
trade-executor \
backtest \
--strategy-file=strategy/ethereum-memecoin-swing.py \
--trading-strategy-api-key=$TRADING_STRATEGY_API_KEY
trade-executor \
backtest \
--strategy-file=strategies/prod_strategies/ethereum-memecoin-swing.py \
--trading-strategy-api-key=$TRADING_STRATEGY_API_KEY
"""
import datetime
import json
import logging
import os
import re
from pathlib import Path
import pandas as pd
import pandas_ta
from google.oauth2 import service_account
import cachetools
from tradeexecutor.state.identifier import TradingPairIdentifier
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.state.visualisation import PlotKind
from tradeexecutor.strategy.cycle import CycleDuration
from tradeexecutor.strategy.default_routing_options import TradeRouting
from tradeexecutor.strategy.execution_context import ExecutionContext, ExecutionMode
from tradeexecutor.strategy.pandas_trader.alternative_market_data import (
resample_multi_pair,
)
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSet, IndicatorSource
from tradeexecutor.strategy.pandas_trader.strategy_input import (
IndicatorDataNotFoundWithinDataTolerance,
StrategyInput,
StrategyInputIndicators,
)
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.tag import StrategyTag
from tradeexecutor.strategy.trading_strategy_universe import (
TradingStrategyUniverse,
load_partial_data,
translate_token,
)
from tradeexecutor.strategy.universe_model import UniverseOptions
from tradeexecutor.utils.crossover import contains_cross_over, contains_cross_under
from tradingstrategy.candle import GroupedCandleUniverse
from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.liquidity import GroupedLiquidityUniverse
from tradingstrategy.pair import (
HumanReadableTradingPairDescription,
PandasPairUniverse,
StablecoinFilteringMode,
filter_for_base_tokens,
filter_for_stablecoins,
filter_for_quote_tokens,
)
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.universe import Universe
from tradingstrategy.utils.token_filter import (
deduplicate_pairs_by_volume,
add_base_quote_address_columns,
)
from tradingstrategy.utils.token_extra_data import load_extra_metadata
from tradeexecutor.strategy.tvl_size_risk import USDTVLSizeRiskModel
logger = logging.getLogger(__name__)
trading_strategy_engine_version = "0.5"
USDC = "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913".lower()
WETH = "0x4200000000000000000000000000000000000006".lower()
# Logging helpers
def strip_except_newlines(text):
# Split by newlines, strip each line, then rejoin
return "\n".join(line.strip() for line in text.splitlines())
def dedent_any(text: str) -> str:
"""Dedent variable indents of the text"""
return re.sub(r"^\s+", "", strip_except_newlines(text), flags=re.MULTILINE)
class Parameters:
"""Parameteres for this strategy.
- Collect parameters used for this strategy here
- Both live trading and backtesting parameters
"""
id = "base-sentimeme" # Used in cache paths
cycle_duration = CycleDuration.d1 # Daily rebalance
candle_time_bucket = TimeBucket.d1
allocation = 0.142
max_assets = 7
#
# Liquidity risk analysis and data quality
#
min_price = 0.00000000000000000001
max_price = 1_000_000
min_liquidity_trade_threshold = 0.05
min_liquidity_threshold = 25000
min_volume = 30000
# Trigger
# Safety Guards
minimum_mometum_threshold = 0.1
momentum_lookback_bars = 9
sma_length = 12
social_ema_short_length = 6
social_ema_long_length = 11
cross_over_period = 2
social_ma_min = 10
stop_loss_pct = 0.92
trailing_stop_loss_pct = 0.86
trailing_stop_loss_activation_level = 1.3
# Trade execution parameters
slippage_tolerance = 0.06
max_buy_tax = 0.06
max_sell_tax = 0.06
token_risk_threshold = 50
# If the pair does not have enough real time quote token TVL, skip trades smaller than this
min_trade_size_usd = 5.00
# Only do trades where we are less than 1% of the pool quote token TVL
per_position_cap_of_pool = 0.01
#
# Live trading only
#
chain_id = ChainId.base
routing = TradeRouting.default
required_history_period = datetime.timedelta(days=30 + 1)
#
# Backtesting only
#
backtest_start = datetime.datetime(2024, 9, 1)
backtest_end = datetime.datetime(2024, 12, 31)
initial_cash = 10_000
initial_deposit = 10_000
stop_loss_time_bucket = TimeBucket.h1
def join_market_trend_data(df_trend, market_data):
df_trend.reset_index(inplace=True)
market_data.reset_index(inplace=True)
# Ensure "date" in df_trend_new is also in datetime format (if not already done)
df_trend["date"] = pd.to_datetime(df_trend["date"])
market_data["date"] = pd.to_datetime(market_data["date"])
market_data["date"] = market_data["date"].dt.tz_localize(None)
# Perform the merge
merged_df = pd.merge(df_trend, market_data, how="left", on=["coin_id", "date"])
merged_df.set_index(["coin_id", "coin", "date"], inplace=True)
return merged_df
def get_google_storage_credentials():
credentials_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
credentials_dict = json.loads(credentials_json)
return {"token": credentials_dict}
def get_trend_data() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
storage_options = get_google_storage_credentials()
df_platforms = pd.read_csv(
"gs://taraxa-research/coingecko_data_pipeline/df_platforms.csv",
storage_options=storage_options,
)
df_categories = pd.read_csv(
"gs://taraxa-research/coingecko_data_pipeline/df_categories.csv",
storage_options=storage_options,
)
df_stablecoins = df_categories[df_categories["category_name"] == "Stablecoins"]
df_markets_data = pd.read_parquet(
"gs://taraxa-research/coingecko_data_pipeline/df_market_data_historical.parquet",
storage_options=storage_options,
)
df_trend = pd.read_parquet(
"gs://taraxa-research/trend_spotting/trends_v1.parquet",
storage_options=storage_options,
)
df_trend.reset_index(inplace=True)
df_trend_eth = df_trend.merge(
df_platforms[df_platforms["platform_name"] == "Base"],
on="coin_id",
how="inner",
)
df_trend = df_trend_eth.merge(
df_categories[df_categories["category_name"] == "Meme"],
on="coin_id",
how="inner",
)
df_trend_w_market = join_market_trend_data(df_trend, df_markets_data)
df_trend_w_market.reset_index(inplace=True)
df_trend_w_market = df_trend_w_market.rename(
columns={"coin": "symbol", "coin_id_count": "social_mentions"}
)
df_trend_eth = df_trend_eth.rename(
columns={"coin": "symbol", "coin_id_count": "social_mentions"}
)
return df_trend_w_market, df_trend_eth, df_stablecoins
def filter_pairs_by_risk(
pairs_df: pd.DataFrame,
risk_threshold: int = 50,
max_buy_tax: float = 6.0,
max_sell_tax: float = 6.0,
risk_traits: dict = None,
) -> pd.DataFrame:
"""Filter pairs DataFrame based on tax rates, TokenSniffer risk score, and specific risk traits.
Args:
pairs_df (pd.DataFrame): DataFrame containing trading pair information
risk_threshold (int): Minimum acceptable TokenSniffer risk score (0-100, higher is better)
max_buy_tax (float): Maximum allowed buy tax percentage (default 6.0)
max_sell_tax (float): Maximum allowed sell tax percentage (default 6.0)
risk_traits (dict): Dictionary of risk traits to filter on. If None, only tax and risk score are checked
Returns:
pd.DataFrame: Filtered pairs DataFrame containing only pairs meeting all criteria
Example Risk Traits Dictionary:
```python
# Complete risk traits dictionary
risk_traits = {
# Contract-level risks
'has_mint': False, # Can new tokens be minted
'has_fee_modifier': False, # Can fees be modified after deployment
'has_max_transaction_amount': False, # Presence of max transaction limits
'has_blocklist': False, # Can addresses be blacklisted
'has_proxy': False, # Is it a proxy contract (upgradeable)
'has_pausable': False, # Can trading be paused
# Ownership and control risks
'is_ownership_renounced': True, # Ownership should be renounced
'is_source_verified': True, # Contract should be verified
# Trading risks
'is_sellable': True, # Token can be sold
'has_high_buy_fee': False, # High buy fees present
'has_high_sell_fee': False, # High sell fees present
'has_extreme_fee': False, # Extremely high fees (>30%)
# Liquidity risks
'has_inadequate_liquidity': False, # Insufficient liquidity
'has_inadequate_initial_liquidity': False, # Started with low liquidity
# Token distribution risks
'has_high_creator_balance': False, # Creator holds large portion
'has_high_owner_balance': False, # Owner holds large portion
'has_high_wallet_balance': False, # Any wallet holds too much
'has_burned_exceeds_supply': False, # Burned amount > supply (impossible)
# Additional safety checks
'is_flagged': False, # Token is flagged for issues
'is_honeypot': False, # Known honeypot
'has_restore_ownership': False, # Can ownership be restored
'has_non_standard_erc20': False # Non-standard ERC20 implementation
}
```
Example Risk Profiles:
```python
# Conservative (strict) settings
conservative_risk_traits = {
'has_mint': False,
'has_fee_modifier': False,
'has_blocklist': False,
'has_proxy': False,
'has_pausable': False,
'is_ownership_renounced': True,
'is_source_verified': True,
'is_sellable': True,
'has_high_buy_fee': False,
'has_high_sell_fee': False,
'is_flagged': False,
'is_honeypot': False
}
# Moderate settings
moderate_risk_traits = {
'has_mint': False,
'is_source_verified': True,
'is_sellable': True,
'has_extreme_fee': False,
'is_honeypot': False,
'is_flagged': False
}
# Aggressive settings
aggressive_risk_traits = {
'is_sellable': True,
'is_honeypot': False,
'is_flagged': False
}
```
Usage:
```python
# Using conservative settings with custom tax limits
filtered_df = filter_pairs_by_risk(
pairs_df,
risk_threshold=60,
max_buy_tax=5.0,
max_sell_tax=5.0,
risk_traits=conservative_risk_traits
)
# Custom risk profile
custom_risk_traits = {
'is_sellable': True,
'is_honeypot': False,
'has_mint': False,
'has_extreme_fee': False,
'is_source_verified': True
}
filtered_df = filter_pairs_by_risk(
pairs_df,
risk_threshold=70,
max_buy_tax=3.0,
max_sell_tax=3.0,
risk_traits=custom_risk_traits
)
```
"""
# Create a copy to avoid modifying original
filtered_df = pairs_df.copy()
initial_count = len(filtered_df)
# Replace NaN values with 0 for buy_tax and sell_tax
filtered_df["buy_tax"] = filtered_df["buy_tax"].fillna(0)
filtered_df["sell_tax"] = filtered_df["sell_tax"].fillna(0)
# Filter for pairs meeting tax thresholds
filtered_df = filtered_df[
(filtered_df["buy_tax"] <= max_buy_tax)
& (filtered_df["sell_tax"] <= max_sell_tax)
]
after_tax_count = len(filtered_df)
def check_token_risk(row):
try:
# Extract TokenSniffer data from the nested structure
token_data = row["other_data"]["top_pair_data"].token_sniffer_data
if token_data is None:
return False
print(f"Token data: {token_data}")
# Check risk score threshold
if token_data.get("riskScore", 0) < risk_threshold:
return False
# Check each specified risk trait if provided
if risk_traits:
for trait, desired_value in risk_traits.items():
if token_data.get(trait, not desired_value) != desired_value:
return False
return True
except (KeyError, AttributeError) as e:
print(f"Error processing row: {e}")
return False
# Apply TokenSniffer filters if risk_traits provided
if risk_traits is not None:
filtered_df = filtered_df[filtered_df.apply(check_token_risk, axis=1)]
final_count = len(filtered_df)
print(
"Filtering results: Initial pairs: %d, after tax filters: %d, after risk filters: %d",
initial_count,
after_tax_count,
final_count,
)
return filtered_df
def prefilter_data(client: Client, execution_context) -> pd.DataFrame:
# If the pair does not have this liquidity, skip
min_prefilter_liquidity = 10_000
chain_id = Parameters.chain_id
time_bucket = Parameters.candle_time_bucket
# We need pair metadata to know which pairs belong to Polygon
logger.info("Downloading/opening pairs dataset")
pairs_df = client.fetch_pair_universe().to_pandas()
our_chain_pair_ids = pairs_df[pairs_df.chain_id == chain_id.value][
"pair_id"
].unique()
logger.info(
f"We have data for {len(our_chain_pair_ids)} trading pairs on {chain_id.name}"
)
# Download all liquidity data, extract
# trading pairs that exceed our prefiltering threshold
logger.info("Downloading/opening liquidity dataset")
liquidity_df = client.fetch_all_liquidity_samples(time_bucket).to_pandas()
logger.info(f"Filtering out liquidity for chain {chain_id.name}")
liquidity_df = liquidity_df.loc[liquidity_df.pair_id.isin(our_chain_pair_ids)]
liquidity_df = liquidity_df.loc[liquidity_df.timestamp > Parameters.backtest_start]
liquidity_per_pair = liquidity_df.groupby(liquidity_df.pair_id)
logger.info(
f"Chain {chain_id.name} has liquidity data for {len(liquidity_per_pair.groups)}"
)
passed_pair_ids = set()
liquidity_output_chunks = []
for pair_id, pair_df in liquidity_per_pair:
if pair_df["high"].max() > min_prefilter_liquidity:
liquidity_output_chunks.append(pair_df)
passed_pair_ids.add(pair_id)
logger.info(
f"After filtering for {min_prefilter_liquidity:,} USD min liquidity we have {len(passed_pair_ids)} pairs"
)
liquidity_df = pd.concat(liquidity_output_chunks)
logger.info("Downloading/opening OHLCV dataset")
price_df = client.fetch_all_candles(time_bucket).to_pandas()
price_df = price_df.loc[price_df.pair_id.isin(passed_pair_ids)]
price_df.index = pd.DatetimeIndex(price_df.timestamp)
# FILTER MORE
NUM_TOKENS = 250
custom_data_df, df_trend, df_stablecoins = get_trend_data()
custom_data_df.index = pd.DatetimeIndex(custom_data_df["date"])
custom_data_df["contract_address"] = custom_data_df["contract_address"].str.lower()
df_trend["contract_address"] = df_trend["contract_address"].str.lower()
custom_data_df.sort_index(ascending=True)
custom_data_no_dups = custom_data_df.drop_duplicates(
subset=["coin_id", "symbol"], keep="last"
)[["coin_id", "symbol", "contract_address", "platform_name", "total_volume"]]
start = custom_data_df.index[0]
end = custom_data_df.index[-1]
csv_token_list = list(custom_data_df.contract_address.unique())
logger.info(
f"CSV contains data for {len(csv_token_list)} tokens, time range {start} - {end}"
)
# Remove Stablecoins
custom_data_no_dups = custom_data_no_dups[
~custom_data_no_dups.coin_id.isin(df_stablecoins.coin_id.unique())
]
csv_token_list_backtest = custom_data_no_dups.sort_values(
by="total_volume", ascending=False
)[["symbol", "contract_address"]].iloc[:NUM_TOKENS]
logger.info(
f"Pre-selecting the following tokens and contract addresses to backtest {csv_token_list_backtest}"
)
base_erc20_address_list = []
erc20_addresses_avoid = ["0xA3c322Ad15218fBFAEd26bA7f616249f7705D945".lower()]
base_erc20_address_list += csv_token_list_backtest["contract_address"].tolist()
base_erc20_address_list = [
address
for address in set(base_erc20_address_list)
if address.lower() not in erc20_addresses_avoid
]
# Move logic from create_trading_universe here
SUPPORTED_DEXES = {"uniswap-v3", "uniswap-v2", "sushi"}
# Get the token list of everything in the CSV + hardcoded WETH
desired_trading_addresses = set(base_erc20_address_list)
exchange_universe = client.fetch_exchange_universe()
exchange_universe = exchange_universe.limit_to_chains(
{Parameters.chain_id}
).limit_to_slugs(SUPPORTED_DEXES)
pairs_df = client.fetch_pair_universe().to_pandas()
logger.info(
f"Prefilter data contains {len(liquidity_df):,} liquidity samples dn {len(price_df):,} OHLCV candles"
)
if (
execution_context.live_trading
or execution_context.mode == ExecutionMode.preflight_check
):
# for live trading we only need the last 60 days
now = datetime.datetime.utcnow()
start_at = now - datetime.timedelta(days=60)
end_at = now
price_df = price_df.loc[price_df.timestamp >= start_at]
liquidity_df = liquidity_df.loc[liquidity_df.timestamp >= start_at]
elif execution_context.mode == ExecutionMode.backtesting:
start_at = Parameters.backtest_start
end_at = Parameters.backtest_end
price_df = price_df.loc[
(price_df.timestamp >= start_at) & (price_df.timestamp <= end_at)
]
liquidity_df = liquidity_df.loc[
(liquidity_df.timestamp >= start_at) & (liquidity_df.timestamp <= end_at)
]
else:
raise RuntimeError(f"Unknown execution mode {execution_context.mode}")
# Prefilter for more liquidity conditions
liquidity_per_pair = liquidity_df.groupby(liquidity_df.pair_id)
logger.info(
f"Chain {chain_id.name} has liquidity data for {len(liquidity_per_pair.groups)}"
)
# Get the date 30 days ago
thirty_days_ago = end_at - datetime.timedelta(days=30)
# Create a subset of the data for the last 30 days, in live trading this makes more sense
liquidity_last_30_days = liquidity_df[liquidity_df.timestamp > thirty_days_ago]
liquidity_per_pair_last_30d = liquidity_last_30_days.groupby("pair_id")
passed_pair_ids = set()
for pair_id, pair_df in liquidity_per_pair_last_30d:
# Check the maximum high liquidity in the last 30 days
if pair_df["high"].max() > Parameters.min_liquidity_threshold:
passed_pair_ids.add(pair_id)
pairs_df = pairs_df.loc[pairs_df.pair_id.isin(passed_pair_ids)]
logger.info(
f"After liquidity filter {Parameters.min_liquidity_threshold:,} USD we have {len(pairs_df)} trading pairs"
)
allowed_exchange_ids = set(exchange_universe.exchanges.keys())
pairs_df = pairs_df.loc[pairs_df.exchange_id.isin(allowed_exchange_ids)]
logger.info(f"After DEX filter we have {len(pairs_df)} trading pairs")
# Store reference USDC ETH pair so we have an example pair with USDC as quote token for reserve asset
eth_usdc_addresses = [
"0x88a43bbdf9d098eec7bceda4e2494615dfd9bb9c", # Uniswap V2,
"0xd0b53d9277642d899df5c87a3966a349a798f224", # Uniswap V3
]
ref_usdc_pairs = pairs_df[
pairs_df["address"].isin([addr.lower() for addr in eth_usdc_addresses])
].copy()
# Pairs pre-processing
pairs_df = add_base_quote_address_columns(pairs_df)
pairs_df = pairs_df.loc[(pairs_df["chain_id"] == chain_id)]
logger.info(f"Before filter_for_base_tokens we have {len(pairs_df)} trading pairs")
pairs_df = filter_for_base_tokens(pairs_df, list(desired_trading_addresses))
logger.info(f"After filter_for_base_tokens we have {len(pairs_df)} trading pairs")
# Retrofit TokenSniffer data
pairs_df = load_extra_metadata(
pairs_df,
client=client,
)
pairs_df = filter_pairs_by_risk(
pairs_df,
risk_threshold=Parameters.token_risk_threshold,
max_buy_tax=Parameters.max_buy_tax,
max_sell_tax=Parameters.max_sell_tax,
risk_traits=None,
)
# Do cross-section of tokens from custom data
pairs_df = filter_for_stablecoins(
pairs_df, StablecoinFilteringMode.only_volatile_pairs
)
logger.info(
f"After custom data ERC-20 token address cross section filter we have {len(pairs_df)} trading pairs"
)
# We want to keep only USDC or WETH quoted pairs
pairs_df = filter_for_quote_tokens(
pairs_df,
{
USDC,
WETH,
},
)
logger.info(f"After quote token filter we have {len(pairs_df)} trading pairs")
logger.info(f"Before deduplication we have {len(pairs_df)} trading pairs")
pairs_df = deduplicate_pairs_by_volume(pairs_df)
logger.info(f"After deduplication we have {len(pairs_df)} trading pairs")
# Add back USDC pair that have ref pair in the pairs universe
ref_usdc_pairs = ref_usdc_pairs[ref_usdc_pairs["pair_id"].isin(passed_pair_ids)]
pairs_df = pd.concat([pairs_df, ref_usdc_pairs]).drop_duplicates(subset=["pair_id"])
logger.info(f"Before deduplication we have {len(pairs_df)} trading pairs")
logger.info(f"After deduplication we have {len(pairs_df)} trading pairs")
return pairs_df, price_df, liquidity_df, exchange_universe
def create_trading_universe(
timestamp: datetime.datetime,
client: Client,
execution_context: ExecutionContext,
universe_options: UniverseOptions,
) -> TradingStrategyUniverse:
"""Create the trading universe."""
pairs_df, price_df, liquidity_df, exchange_universe = prefilter_data(
client, execution_context
)
if execution_context.mode == ExecutionMode.backtesting:
# Resample strategy decision candles to daily
price_df["timestamp"] = pd.to_datetime(price_df["timestamp"])
daily_candles = resample_multi_pair(price_df, Parameters.candle_time_bucket)
daily_candles["timestamp"] = daily_candles.index
logger.info(
f"After downsampling we have {len(daily_candles)} OHLCV candles and {len(liquidity_df)} liquidity samples"
)
candle_universe = GroupedCandleUniverse(
daily_candles,
time_bucket=Parameters.candle_time_bucket,
forward_fill=True,
)
liquidity_universe = GroupedLiquidityUniverse(liquidity_df)
# The final trading pair universe contains metadata only for pairs that passed
# our filters
pairs_universe = PandasPairUniverse(
pairs_df,
exchange_universe=exchange_universe,
)
stop_loss_candle_universe = GroupedCandleUniverse(price_df)
data_universe = Universe(
time_bucket=Parameters.candle_time_bucket,
liquidity_time_bucket=Parameters.candle_time_bucket,
exchange_universe=exchange_universe,
pairs=pairs_universe,
candles=candle_universe,
liquidity=liquidity_universe,
chains={Parameters.chain_id},
forward_filled=True,
)
reserve_asset = translate_token(pairs_universe.get_token(USDC))
strategy_universe = TradingStrategyUniverse(
data_universe=data_universe,
backtest_stop_loss_time_bucket=Parameters.stop_loss_time_bucket,
backtest_stop_loss_candles=stop_loss_candle_universe,
reserve_assets=[reserve_asset],
)
elif execution_context.live_trading:
dataset = load_partial_data(
client,
execution_context=execution_context,
time_bucket=Parameters.candle_time_bucket,
pairs=pairs_df,
universe_options=universe_options,
stop_loss_time_bucket=Parameters.stop_loss_time_bucket,
# lending_reserves=lending_reserves,
required_history_period=Parameters.required_history_period,
liquidity=True,
)
strategy_universe = TradingStrategyUniverse.create_from_dataset(
dataset,
forward_fill=True,
reserve_asset=USDC,
# Because some pairs may have died during real-time trading, we still
# need to forward fill until today, not the last day they had a trade
forward_fill_until=timestamp,
)
return strategy_universe
@cachetools.cached(
cache=cachetools.TTLCache(ttl=60 * 60 * 2, maxsize=1000),
key=lambda s, p: cachetools.keys.hashkey("get_broken_pairs"),
)
def get_broken_pairs(strategy_universe: TradingStrategyUniverse, parameters) -> set:
# Run some extra sanity check for small cap tokens
broken_trading_pairs = set()
pairs_to_avoid = [87449]
for pair in strategy_universe.iterate_pairs():
logger.debug(f"Readable format pair {pair.get_human_description()}")
logger.debug(
f"Checking pair: {pair.base.token_symbol} with address {pair.base.address} and quote token {pair.quote.token_symbol} with address {pair.quote.address}"
)
reason = strategy_universe.get_trading_broken_reason(
pair,
min_candles_required=10,
min_price=parameters.min_price,
max_price=parameters.max_price,
)
if pair.internal_id in pairs_to_avoid:
broken_trading_pairs.add(pair)
if reason:
logger.debug(
f"FAIL: {pair} with base token {pair.base.address} may be problematic: {reason}"
)
broken_trading_pairs.add(pair)
else:
logger.debug(f"OK: {pair} included in the backtest")
logger.info(
f"Total {len(broken_trading_pairs)} broken trading pairs detected, having {strategy_universe.get_pair_count() - len(broken_trading_pairs)} good pairs left to trade"
)
return broken_trading_pairs
def is_acceptable(
indicators: StrategyInputIndicators,
parameters: StrategyParameters,
pair: TradingPairIdentifier,
) -> bool:
"""Check the pair for risk acceptance
:return:
True if we should trade this pair
"""
broken_trading_pairs = get_broken_pairs(indicators.strategy_universe, parameters)
if pair in broken_trading_pairs:
# Don't even bother to try trade this
return False
avoid_backtesting_tokens = {
# Trading jsut stops (though there is liq left)
# https://tradingstrategy.ai/trading-view/ethereum/uniswap-v3/id-usdc-fee-30
"PEOPLE",
"WBTC",
}
if pair.base.token_symbol in avoid_backtesting_tokens:
# Manually blacklisted toen for this backtest
return False
# Pair does not quality yet due to low liquidity
liquidity = indicators.get_tvl(pair=pair)
if liquidity is None or liquidity <= parameters.min_liquidity_threshold:
return False
volume = indicators.get_price(pair, column="volume")
close_price = indicators.get_price(pair=pair)
if (volume is not None) and (close_price is not None):
volume_adjusted = volume / close_price
if volume_adjusted < parameters.min_volume:
return False
return True
return False
@cachetools.cached(cache=cachetools.TTLCache(ttl=60 * 60 * 2, maxsize=1000))
def get_custom_data_group():
storage_options = get_google_storage_credentials()
logger.info("Getting custom data group")
df_platforms = pd.read_csv(
"gs://taraxa-research/coingecko_data_pipeline/df_platforms.csv",
storage_options=storage_options,
)
df_trend = pd.read_parquet(
"gs://taraxa-research/trend_spotting/trends_v1.parquet",
storage_options=storage_options,
)
df_trend.reset_index(inplace=True)
df_trend_eth = df_trend.merge(
df_platforms[df_platforms["platform_name"] == "Base"],
on="coin_id",
how="inner",
)
df_trend_eth = df_trend_eth.rename(
columns={"coin": "symbol", "coin_id_count": "social_mentions"}
)
df_trend_eth["contract_address"] = df_trend_eth["contract_address"].str.lower()
# Create per-pair DataFrame group by
custom_data_group = (
df_trend_eth.set_index("date").sort_index().groupby("contract_address")
)
return custom_data_group
custom_data_group = None
def add_metric(pair: TradingPairIdentifier, metric_name: str) -> pd.Series:
"""Add a specific metric to the dataset."""
custom_data_group = get_custom_data_group()
try:
contract_address = pair.base.address
per_pair = custom_data_group.get_group(contract_address)
logger.info(f"Per pair: {per_pair.tail()}")
per_pair_series = per_pair[metric_name]
metric_series = per_pair_series[
~per_pair_series.index.duplicated(keep="last")
].rename(metric_name)
full_date_range = pd.date_range(
start=metric_series.index.min(), end=metric_series.index.max(), freq="D"
)
metric_series = metric_series.reindex(full_date_range)
metric_series = metric_series.ffill()
metric_series.sort_index(inplace=True)
return metric_series
except Exception as e:
logger.info(f"Error adding metric {metric_name} for pair {pair}: {str(e)}")
return pd.Series(dtype="float64", index=pd.DatetimeIndex([]))
def calculate_metric_emas(
pair: TradingPairIdentifier, metric_name: str, short_length: int, long_length: int
) -> pd.DataFrame:
"""Calculate short and long EMAs for a specific metric."""
metric_series = add_metric(pair, metric_name)
metric_series = metric_series.interpolate(
method="linear", limit_direction="forward"
)
ema_short = (
metric_series.ewm(span=short_length, adjust=False)
.mean()
.rename(f"{metric_name}_ema_short")
)
ema_long = (
metric_series.ewm(span=long_length, adjust=False)
.mean()
.rename(f"{metric_name}_ema_long")
)
emas_df = pd.concat([ema_short, ema_long], axis=1)
return emas_df
def momentum(close, momentum_lookback_bars) -> pd.Series:
"""Calculate momentum series to be used as a signal.
This indicator is later processed in decide_trades() to a weighted alpha signal.
:param momentum_lookback_bars:
Calculate returns based on this many bars looked back
"""
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.shift.html#pandas.DataFrame.shift
start_close = close.shift(momentum_lookback_bars)
momentum = (close - start_close) / start_close
return momentum
def create_indicators(
timestamp: datetime.datetime | None,
parameters: StrategyParameters,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext,
):
indicators = IndicatorSet()
indicators.add(
"momentum",
momentum,
{"momentum_lookback_bars": parameters.momentum_lookback_bars},
IndicatorSource.close_price,
)
social_metrics = ["social_mentions"]
for metric in social_metrics:
indicators.add(
metric,
add_metric,
{"metric_name": metric},
IndicatorSource.external_per_pair,
)
# Add EMA indicators
indicators.add(
f"{metric}_emas",
calculate_metric_emas,
{
"metric_name": metric,
"short_length": parameters.social_ema_short_length,
"long_length": parameters.social_ema_long_length,
},
IndicatorSource.external_per_pair,
)
return indicators
def decide_trades(
input: StrategyInput,
) -> list[TradeExecution]:
#
# Decision cycle setup.
# Read all variables we are going to use for the decisions.
#
parameters = input.parameters
position_manager = input.get_position_manager()
state = input.state
timestamp = input.timestamp
indicators = input.indicators
strategy_universe = input.strategy_universe
# Initiate logging vars
equity_before = state.portfolio.get_total_equity()
trade_decision_report = {}
num_signals_accepted = 0
num_signals_rejected = 0
signals_created = []
signals_rejected = []
rejected_trades_lack_of_liquidity = []
cash = position_manager.get_current_cash()
available_cash = cash - position_manager.get_pending_redemptions()
total_equity = state.portfolio.get_total_equity()
if total_equity > 10_000_000:
position_valuations = "\n".join(
[
f"{p} (token {p.pair.base.address}): {p.get_value()}"
for p in state.portfolio.open_positions.values()
]
)
raise RuntimeError(
f"Portfolio total equity exceeded 1,000,000 USD. Some broken math likely happened. Total equity is {total_equity} USD.\nOpen positions:\n{position_valuations}"
)
#
# Trading logic
#
# We do some extra checks here as we are trading low quality
# low cap tokens which often have outright malicious data for trading.
#
trades = []
# Enable trailing stop loss after we reach the profit taking level
#
for position in state.portfolio.open_positions.values():
if position.trailing_stop_loss_pct is None:
close_price = indicators.get_price(pair=position.pair)
if (
close_price
and close_price
>= position.get_opening_price()
* parameters.trailing_stop_loss_activation_level
):
position.trailing_stop_loss_pct = parameters.trailing_stop_loss_pct
elif position.stop_loss is None:
position.stop_loss = parameters.stop_loss_pct
size_risk_model = USDTVLSizeRiskModel(
pricing_model=input.pricing_model,
per_position_cap=parameters.per_position_cap_of_pool, # This is how much % by all pool TVL we can allocate for a position
missing_tvl_placeholder_usd=100_000, # Placeholder for missing TVL data until we get the data off the chain
)
for pair in strategy_universe.iterate_pairs():
if not is_acceptable(indicators, parameters, pair):
# Skip this pair for the risk management
continue
position_for_pair = state.portfolio.get_open_position_for_pair(pair)
# Extract Social indicators here
try:
social_mentions = indicators.get_indicator_value(
"social_mentions",
pair=pair,
index=-2,
data_delay_tolerance=pd.Timedelta(days=15),
)
logger.info(
f"Social mentions for pair {pair.base.token_symbol}: {social_mentions}"
)
except IndicatorDataNotFoundWithinDataTolerance:
logger.info(
f"Social mentions data not found within tolerance for pair {pair}. "
"Skipping this asset."
)
continue
try:
ema_short = indicators.get_indicator_value(
"social_mentions_emas",
pair=pair,
column=f"social_mentions_ema_short",
index=-2,
data_delay_tolerance=pd.Timedelta(days=15),
)
except IndicatorDataNotFoundWithinDataTolerance:
logger.info(
f"EMA short data not found within tolerance for pair {pair}. "
"Skipping this asset."
)
continue
try:
ema_long = indicators.get_indicator_value(
"social_mentions_emas",
pair=pair,
column=f"social_mentions_ema_long",
index=-2,
data_delay_tolerance=pd.Timedelta(days=15),
)
except IndicatorDataNotFoundWithinDataTolerance:
logger.info(
f"EMA long data not found within tolerance for pair {pair}. "
"Skipping this asset."
)
continue
ema_short_series = indicators.get_indicator_series(
"social_mentions_emas", pair=pair, column=f"social_mentions_ema_short"
)
ema_long_series = indicators.get_indicator_series(
"social_mentions_emas", pair=pair, column=f"social_mentions_ema_long"
)
# Volume Based Metrics
volume = indicators.get_price(column="volume", pair=pair)
momentum = indicators.get_indicator_value("momentum", pair=pair)
tvl = indicators.get_tvl(pair=pair)
crossover_occurred = False
try:
crossover, crossover_index = contains_cross_over(
ema_short_series,
ema_long_series,
lookback_period=parameters.cross_over_period,
must_return_index=True,
)
crossover_occurred = crossover and (
crossover_index >= -parameters.cross_over_period
)
except Exception as e:
crossover = None
crossover_occurred = False
logger.trade("Cross over did not occur due to exception: %s", e)
#
# Visualisations
#
if input.is_visualisation_enabled():
visualisation = state.visualisation
visualisation.plot_indicator(
timestamp,
f"Social mentions {pair.base}",
PlotKind.technical_indicator_detached,
social_mentions,
pair=pair,
)
visualisation.plot_indicator(
timestamp,
f"Social mentions EMA {pair.base}",
PlotKind.technical_indicator_detached,
ema_short,
pair=pair,
)
visualisation.plot_indicator(
timestamp,
f"Social mentions Long {pair.base}",
PlotKind.technical_indicator_overlay_on_detached,
ema_long,
pair=pair,
detached_overlay_name=f"Social mentions EMA {pair.base}",
)
visualisation.plot_indicator(
timestamp,
f"Momentum {pair.base}",
PlotKind.technical_indicator_detached,
momentum,
pair=pair,
)
trade_decision_report[pair.base.token_symbol] = {
"pair": pair.base.token_symbol,
"momentum": momentum,
"social_mentions": social_mentions,
"crossover_occurred": crossover_occurred,
"momentum_above_threshold": (
momentum >= parameters.minimum_mometum_threshold
if momentum is not None
else False
),
"social_mention_min_satisfied": (
ema_short >= parameters.social_ma_min
if ema_short is not None
else False
),
"all_conditions_met": (
crossover_occurred
and momentum is not None
and momentum >= parameters.minimum_mometum_threshold
and ema_short is not None
and ema_short >= parameters.social_ma_min
),
}
if trade_decision_report[pair.base.token_symbol]["all_conditions_met"]:
logger.info(
f"Accepted signal for {pair.base.token_symbol} current num_signals_accepted: {num_signals_accepted}"
)
num_signals_accepted += 1
signals_created.append(trade_decision_report[pair.base.token_symbol])
else:
logger.info(
f"Rejected signal for {pair.base.token_symbol} current num_signals_rejected: {num_signals_rejected}"
)
num_signals_rejected += 1
signals_rejected.append(trade_decision_report[pair.base.token_symbol])
# Check if we are too early in the backtesting to have enough data to calculate indicators
# if None in (volume, bb_upper_interactions_social, bb_upper_interactions_social, bb_upper_sentiment_social, social_mentions, interactions, sentiment, sma):
if None in (volume, social_mentions, tvl):
continue
# Make sure you don't trade the same base token in current traded positions
open_positions = state.portfolio.open_positions.values()
base_token_address = pair.base.address
quote_token_address = pair.quote.address
# Check if there's already an open position with the same quote token
existing_position_with_quote = any(
pos.pair.base.address == base_token_address for pos in open_positions
)
# If there's already an open position with the same quote token, skip this pair
if existing_position_with_quote:
continue
if (
len(state.portfolio.open_positions) < parameters.max_assets
and state.portfolio.get_open_position_for_pair(pair) is None
):
should_open_position = False
if momentum is None:
if crossover_occurred and ema_short >= parameters.social_ma_min:
should_open_position = True
# if (tvl * parameters.min_liquidity_trade_threshold) <= (
# cash * parameters.allocation
# ):
# buy_amount = tvl * parameters.min_liquidity_trade_threshold
# else:
# buy_amount = cash * parameters.allocation
# logger.trade(
# "Opening position for %s with %s USDC", pair, buy_amount
# )
# trades += position_manager.open_spot(
# pair,
# value=buy_amount,
# stop_loss_pct=parameters.stop_loss_pct,
# )
elif (
crossover_occurred
and (momentum >= parameters.minimum_mometum_threshold)
and ema_short >= parameters.social_ma_min
):
should_open_position = True
if should_open_position:
size_risk = size_risk_model.get_acceptable_size_for_position(
timestamp=timestamp,
pair=pair,
asked_value=available_cash * parameters.allocation,
)
buy_amount = size_risk.accepted_size
logger.trade(
"Position size risk, pair: %s, asked: %s, accepted: %s, diagnostics: %s",
pair,
size_risk.asked_size,
buy_amount,
size_risk.diagnostics_data,
)
if buy_amount >= parameters.min_trade_size_usd:
logger.trade(
"Opening position for %s with %f USDC", pair, buy_amount
)
trades += position_manager.open_spot(
pair,
value=buy_amount,
stop_loss_pct=parameters.stop_loss_pct,
)
else:
rejected_trades_lack_of_liquidity.append(
f"Skipped {trade_decision_report[pair.base.token_symbol]} due to trade size {buy_amount} USD, because it is below our minimum threshold {parameters.min_trade_size_usd} USD"
)
logger.trade(
"Skipping trade size %f USD, because it is below our minimum threshold %f USD",
buy_amount,
parameters.min_trade_size_usd,
)
equity_after = state.portfolio.get_total_equity()
report = dedent_any(
f"""
Trades decided: {len(trades)}
Pairs total: {strategy_universe.data_universe.pairs.get_count()}
Num Signals accepted: {num_signals_accepted}
Num Signals rejected: {num_signals_rejected}
Rejected trades due to lack of liquidity: {rejected_trades_lack_of_liquidity}
Total equity before: {equity_before:,.2f} USD
Total equity after: {equity_after:,.2f} USD
Cash: {position_manager.get_current_cash():,.2f} USD
Redemption queue: {position_manager.get_pending_redemptions():,.2f} USD
Positions: {state.portfolio.open_positions.values()}
Allocated to signals: {equity_after - equity_before:,.2f} USD
Signals created: {signals_created}
Signals rejected: {signals_rejected}
"""
)
state.visualisation.add_message(timestamp, report)
return trades
#
# Strategy metadata.
#
# Displayed in the user interface.
#
tags = {StrategyTag.beta, StrategyTag.live}
name = "Base Sentimeme AI Trading Strategy"
short_description = "Social-data driven memecoin trading strategy on Base"
icon = ""
long_description = """
## Strategy Description
This strategy builds upon the success of TrendMoon's Ethereum memecoin strategy, now optimized for the Base chain. Leveraging TrendMoon's proprietary social data and advanced market metrics, this strategy is designed to identify and trade emerging memecoin opportunities. By aggregating social attention across Telegram—a critical platform for early-stage crypto projects—the strategy incorporates data on social mentions, mindshare, and sentiment to detect trends and capitalize on them effectively.
The strategy combines social metrics with additional market indicators such as trading volume and momentum to determine optimal entry and exit points. This multi-faceted approach captures upside potential while implementing robust risk management to protect against downside risks.
### Core Features:
- **Social Insights**: Analyzes Telegram mentions, sentiment, and mindshare for identifying tokens gaining traction.
- **Market Confirmation**: Uses trading volume and momentum indicators to validate social trends.
- **Risk Management**: Implements stop-losses, trailing stops, and liquidity thresholds to secure profits and limit losses.
- **Daily Execution**: Automatically evaluates trading opportunities and rebalances the portfolio.
- **Non-Custodial**: Operates on decentralized exchanges with USDC as a reserve asset.
### Designed Performance:
The strategy is optimized for bullish or sideways Bitcoin markets where memecoins show strong trends. While performance may decline in downturns, its layered risk management features help mitigate potential losses.
**Past performance is not indicative of future results.**
---
## Assets and Trading Venues
- Trades exclusively on the spot market.
- Focuses on memecoins within the Base ecosystem.
- Maintains reserves in USDC for stability.
- Executes trades on major Base DEXes (e.g., Uniswap, Sushiswap).
- Rebalances positions daily based on market and social metrics.
---
## Risk Management
Key risk management features include:
- **No Leverage**: Limits exposure to high volatility.
- **Liquidity Requirements**: Ensures tokens meet strict trading volume and liquidity thresholds.
- **Multi-Layer Risk Assessment**:
- **Technical Risks**: Stop-losses and trailing stop-losses.
- **On-Chain Risks**: Evaluates token contract safety (e.g., anti-honeypot mechanisms).
- **Social Risks**: Combines sentiment metrics with market data for comprehensive decision-making.
- **Dynamic Sizing**: Adjusts position sizes according to liquidity and portfolio conditions.
- **Hourly Monitoring**: Automated stop-loss checks to minimize downside.
---
## Backtesting Results
The strategy's backtests for the 2024-2025 period on Base show promising performance:
- **CAGR**: 684.39%
- **Sharpe Ratio**: 3.00
- **Sortino Ratio**: 7.59
- **Max Drawdown**: -22%
Backtests demonstrate the strategy's ability to compound smaller returns (20-100%) repeatedly while minimizing catastrophic losses. The low winning percentage reflects the high-risk nature of memecoins but is offset by outsized gains from successful trades.
| Metric | Value |
| ------------- | ------- |
| CAGR | 684.39% |
| Sharpe Ratio | 3.00 |
| Sortino Ratio | 7.59 |
| Max Drawdown | -22% |
---
## Benchmark
For the same backtesting period, here are some benchmark performance comparisons:
| | CAGR | Maximum drawdown | Sharpe |
| ---------------------------- | ------- | ---------------- | ------ |
| This strategy | 684.39% | -23.07% | 2.11 |
| SP500 (20 years) | 11% | -33% | 0.72 |
| Bitcoin (backtesting period) | 140% | -33% | 4.24 |
| Ether (backtesting period) | 77% | -48% | 1.60 |
---
## Trading Frequency
- **Rebalancing**: Daily evaluation and adjustments.
- **Average Position Duration**:
- Winning Trades: ~9 days 8 hours.
- Losing Trades: ~1 day 22 hours.
- **Positions per Day**: ~0.33.
---
## Portfolio Management
- Maximum of 7 concurrent positions.
- Allocates 14.2% of the portfolio per position.
- Uses USDC as a reserve to reduce risk exposure.
- Ensures sufficient liquidity for all trades.
---
## Robustness
This strategy has been stress-tested against significant market volatility from the start of 2023. While not extensively tested across all market cycles, it has shown resilience under various conditions.
---
## Future Updates
TrendMoon's Base Memecoin Strategy represents the next step in automated, social-data-driven trading. As we refine our algorithms and expand our datasets, users can expect:
- **Attention-weighted indices** for emerging narratives.
- **AI-powered agents** leveraging Telegram data to enhance trading decisions.
- **Enhanced tools** for monitoring trends across decentralized ecosystems.
Stay updated with the latest releases and features to ensure you're trading with cutting-edge strategies.
---
## Learn More
- **TrendMoon Social Data Integration**: Powering social trend detection and analysis.
- **Community Resources**:
- Join our [Discord community](https://discord.gg/cU66eMd8) for discussions and updates.
- Visit our [Telegram channel](https://t.me/TrendMoon) for insights and announcements.
- Explore our [website](https://trendmoon.ai) for additional details.
This strategy offers a structured, data-driven alternative to manual memecoin trading, enabling users to participate in the high-risk, high-reward memecoin market with confidence and efficiency.
"""