ETH Memecoin Social Trend Spotter beta
Social-data driven memecoin trading strategy on Ethereum
Source code
The source code of the ETH Memecoin Social Trend Spotter strategy
"""Memecoin strategy based on market sentiment. Long only.
To backtest this strategy module locally:
.. code-block:: console
source scripts/set-latest-tag-gcp.sh
docker-compose run ethereum-memecoin-swing backtest
Or:
.. code-block:: console
trade-executor \
backtest \
--strategy-file=strategy/ethereum-memecoin-swing.py \
--trading-strategy-api-key=$TRADING_STRATEGY_API_KEY
# Run a single cycle in simulation mode
export TRADE_EXECUTOR_VERSION=v828
docker compose run ethereum-memecoin-swing start --run-single-cycle --simulate
docker compose run ethereum-memecoin-swing backtest --backtest-start 2024-09-01 --backtest-end 2024-12-01
"""
import datetime
import json
import logging
import os
import re
import pandas as pd
import cachetools
from tradingstrategy.candle import GroupedCandleUniverse
from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.liquidity import GroupedLiquidityUniverse
from tradingstrategy.pair import (
HumanReadableTradingPairDescription,
PandasPairUniverse,
StablecoinFilteringMode,
filter_for_quote_tokens,
filter_for_stablecoins,
)
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.universe import Universe
from tradingstrategy.utils.token_filter import (
deduplicate_pairs_by_volume,
add_base_quote_address_columns,
)
from tradingstrategy.utils.token_extra_data import load_extra_metadata
from tradeexecutor.state.identifier import TradingPairIdentifier
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.state.visualisation import PlotKind
from tradeexecutor.strategy.cycle import CycleDuration
from tradeexecutor.strategy.default_routing_options import TradeRouting
from tradeexecutor.strategy.execution_context import ExecutionContext, ExecutionMode
from tradeexecutor.strategy.pandas_trader.alternative_market_data import (
resample_multi_pair,
)
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSet, IndicatorSource
from tradeexecutor.strategy.pandas_trader.strategy_input import (
IndicatorDataNotFoundWithinDataTolerance,
StrategyInput,
StrategyInputIndicators,
)
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.tag import StrategyTag
from tradeexecutor.strategy.trading_strategy_universe import (
TradingStrategyUniverse,
load_partial_data,
translate_token,
)
from tradeexecutor.strategy.universe_model import UniverseOptions
from tradeexecutor.utils.crossover import contains_cross_over, contains_cross_under
from tradeexecutor.strategy.tvl_size_risk import USDTVLSizeRiskModel
# Logging helpers
def strip_except_newlines(text):
# Split by newlines, strip each line, then rejoin
return "\n".join(line.strip() for line in text.splitlines())
def dedent_any(text: str) -> str:
"""Dedent variable indents of the text"""
return re.sub(r"^\s+", "", strip_except_newlines(text), flags=re.MULTILINE)
logger = logging.getLogger(__name__)
trading_strategy_engine_version = "0.5"
class Parameters:
"""Parameteres for this strategy.
- Collect parameters used for this strategy here
- Both live trading and backtesting parameters
"""
id = "ethereum-memecoin-swing" # Used in cache paths
cycle_duration = CycleDuration.d1 # Daily rebalance
candle_time_bucket = TimeBucket.d1
allocation = 0.142
max_assets = 7
#
# Liquidity risk analysis and data quality
#
min_price = 0.00000000000000000001
max_price = 1_000_000
min_liquidity_trade_threshold = 0.05
min_liquidity_threshold = 25000
min_volume = 30000
# If the pair does not have enough real time quote token TVL, skip trades smaller than this
min_trade_size_usd = 5.00
# Only do trades where we are less than 1% of the pool quote token TVL
per_position_cap_of_pool = 0.01
# Trigger
# Safety Guards
minimum_mometum_threshold = 0.35
momentum_lookback_bars = 12
social_ema_short_length = 2
social_ema_long_length = 30
cross_over_period = 2
social_ma_min = 10
stop_loss_pct = 0.95
trailing_stop_loss_pct = 0.85
trailing_stop_loss_activation_level = 1.19
# Trade execution parameters
slippage_tolerance = 0.01
max_buy_tax = 0.06
max_sell_tax = 0.06
token_risk_threshold = 60
#
# Live trading only
#
chain_id = ChainId.ethereum
routing = TradeRouting.default
required_history_period = datetime.timedelta(days=30 + 1)
#
# Backtesting only
#
backtest_start = datetime.datetime(2024, 1, 1)
backtest_end = datetime.datetime(2024, 10, 1)
initial_cash = 10_000
initial_deposit = 10_000
stop_loss_time_bucket = TimeBucket.h1
def join_market_trend_data(df_trend, market_data):
df_trend.reset_index(inplace=True)
market_data.reset_index(inplace=True)
# Ensure "date" in df_trend_new is also in datetime format (if not already done)
df_trend["date"] = pd.to_datetime(df_trend["date"])
market_data["date"] = pd.to_datetime(market_data["date"])
market_data["date"] = market_data["date"].dt.tz_localize(None)
# Perform the merge
merged_df = pd.merge(df_trend, market_data, how="left", on=["coin_id", "date"])
merged_df.set_index(["coin_id", "coin", "date"], inplace=True)
return merged_df
def get_google_storage_credentials():
credentials_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
credentials_dict = json.loads(credentials_json)
return {"token": credentials_dict}
def get_ethereum_trend_data() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
logger.info("Getting Ethereum trend data")
storage_options = get_google_storage_credentials()
df_platforms = pd.read_csv(
"gs://taraxa-research/coingecko_data_pipeline/df_platforms.csv",
storage_options=storage_options,
)
df_categories = pd.read_csv(
"gs://taraxa-research/coingecko_data_pipeline/df_categories.csv",
storage_options=storage_options,
)
df_stablecoins = df_categories[df_categories["category_name"] == "Stablecoins"]
df_markets_data = pd.read_parquet(
"gs://taraxa-research/coingecko_data_pipeline/df_market_data_historical.parquet",
storage_options=storage_options,
)
df_trend = pd.read_parquet(
"gs://taraxa-research/trend_spotting/trends_v1.parquet",
storage_options=storage_options,
)
df_trend.reset_index(inplace=True)
df_trend_eth = df_trend.merge(
df_platforms[df_platforms["platform_name"] == "Ethereum"],
on="coin_id",
how="inner",
)
df_trend = df_trend_eth.merge(
df_categories[df_categories["category_name"] == "Meme"],
on="coin_id",
how="inner",
)
df_trend_w_market = join_market_trend_data(df_trend, df_markets_data)
df_trend_w_market.reset_index(inplace=True)
df_trend_w_market = df_trend_w_market.rename(
columns={"coin": "symbol", "coin_id_count": "social_mentions"}
)
df_trend_eth = df_trend_eth.rename(
columns={"coin": "symbol", "coin_id_count": "social_mentions"}
)
return df_trend_w_market, df_trend_eth, df_stablecoins
def filter_pairs_by_risk(
pairs_df: pd.DataFrame,
risk_threshold: int = 60,
max_buy_tax: float = 6.0,
max_sell_tax: float = 6.0,
risk_traits: dict = None,
) -> pd.DataFrame:
"""Filter pairs DataFrame based on tax rates, TokenSniffer risk score, and specific risk traits.
Args:
pairs_df (pd.DataFrame): DataFrame containing trading pair information
risk_threshold (int): Minimum acceptable TokenSniffer risk score (0-100, higher is better)
max_buy_tax (float): Maximum allowed buy tax percentage (default 6.0)
max_sell_tax (float): Maximum allowed sell tax percentage (default 6.0)
risk_traits (dict): Dictionary of risk traits to filter on. If None, only tax and risk score are checked
Returns:
pd.DataFrame: Filtered pairs DataFrame containing only pairs meeting all criteria
Example Risk Traits Dictionary:
```python
# Complete risk traits dictionary
risk_traits = {
# Contract-level risks
'has_mint': False, # Can new tokens be minted
'has_fee_modifier': False, # Can fees be modified after deployment
'has_max_transaction_amount': False, # Presence of max transaction limits
'has_blocklist': False, # Can addresses be blacklisted
'has_proxy': False, # Is it a proxy contract (upgradeable)
'has_pausable': False, # Can trading be paused
# Ownership and control risks
'is_ownership_renounced': True, # Ownership should be renounced
'is_source_verified': True, # Contract should be verified
# Trading risks
'is_sellable': True, # Token can be sold
'has_high_buy_fee': False, # High buy fees present
'has_high_sell_fee': False, # High sell fees present
'has_extreme_fee': False, # Extremely high fees (>30%)
# Liquidity risks
'has_inadequate_liquidity': False, # Insufficient liquidity
'has_inadequate_initial_liquidity': False, # Started with low liquidity
# Token distribution risks
'has_high_creator_balance': False, # Creator holds large portion
'has_high_owner_balance': False, # Owner holds large portion
'has_high_wallet_balance': False, # Any wallet holds too much
'has_burned_exceeds_supply': False, # Burned amount > supply (impossible)
# Additional safety checks
'is_flagged': False, # Token is flagged for issues
'is_honeypot': False, # Known honeypot
'has_restore_ownership': False, # Can ownership be restored
'has_non_standard_erc20': False # Non-standard ERC20 implementation
}
```
Example Risk Profiles:
```python
# Conservative (strict) settings
conservative_risk_traits = {
'has_mint': False,
'has_fee_modifier': False,
'has_blocklist': False,
'has_proxy': False,
'has_pausable': False,
'is_ownership_renounced': True,
'is_source_verified': True,
'is_sellable': True,
'has_high_buy_fee': False,
'has_high_sell_fee': False,
'is_flagged': False,
'is_honeypot': False
}
# Moderate settings
moderate_risk_traits = {
'has_mint': False,
'is_source_verified': True,
'is_sellable': True,
'has_extreme_fee': False,
'is_honeypot': False,
'is_flagged': False
}
# Aggressive settings
aggressive_risk_traits = {
'is_sellable': True,
'is_honeypot': False,
'is_flagged': False
}
```
Usage:
```python
# Using conservative settings with custom tax limits
filtered_df = filter_pairs_by_risk(
pairs_df,
risk_threshold=60,
max_buy_tax=5.0,
max_sell_tax=5.0,
risk_traits=conservative_risk_traits
)
# Custom risk profile
custom_risk_traits = {
'is_sellable': True,
'is_honeypot': False,
'has_mint': False,
'has_extreme_fee': False,
'is_source_verified': True
}
filtered_df = filter_pairs_by_risk(
pairs_df,
risk_threshold=70,
max_buy_tax=3.0,
max_sell_tax=3.0,
risk_traits=custom_risk_traits
)
```
"""
# Create a copy to avoid modifying original
filtered_df = pairs_df.copy()
initial_count = len(filtered_df)
# Replace NaN values with 0 for buy_tax and sell_tax
filtered_df["buy_tax"] = filtered_df["buy_tax"].fillna(0)
filtered_df["sell_tax"] = filtered_df["sell_tax"].fillna(0)
# Filter for pairs meeting tax thresholds
filtered_df = filtered_df[
(filtered_df["buy_tax"] <= max_buy_tax)
& (filtered_df["sell_tax"] <= max_sell_tax)
]
after_tax_count = len(filtered_df)
def check_token_risk(row):
try:
# Extract TokenSniffer data from the nested structure
token_data = row["other_data"]["top_pair_data"].token_sniffer_data
if token_data is None:
return False
# Check risk score threshold
if token_data.get("score", 0) < risk_threshold:
return False
# Access the contract data
contract_data = token_data.get("contract", {})
# Check each specified risk trait
if risk_traits:
for trait, desired_value in risk_traits.items():
# Only check traits that are present in the contract data
if trait in contract_data and contract_data[trait] != desired_value:
print(
f"Trait {trait} with value {contract_data[trait]} does not match desired value {desired_value}"
)
return False
return True
except (KeyError, AttributeError) as e:
logger.error(f"Error processing row: {e}")
return False
# Apply TokenSniffer filters if risk_traits provided
filtered_df = filtered_df[filtered_df.apply(check_token_risk, axis=1)]
final_count = len(filtered_df)
logger.info(
"Filtering results: Initial pairs: %d, after tax filters: %d, after risk filters: %d",
initial_count,
after_tax_count,
final_count,
)
return filtered_df
def prefilter_data(client: Client, execution_context: ExecutionContext) -> pd.DataFrame:
# If the pair does not have this liquidity, skip
min_prefilter_liquidity = 10_000
chain_id = Parameters.chain_id
time_bucket = Parameters.candle_time_bucket
# We need pair metadata to know which pairs belong to Polygon
logger.info("Downloading/opening pairs dataset")
pairs_df = client.fetch_pair_universe().to_pandas()
our_chain_pair_ids = pairs_df[pairs_df.chain_id == chain_id.value][
"pair_id"
].unique()
logger.info(
f"We have data for {len(our_chain_pair_ids)} trading pairs on {chain_id.name}"
)
# Download all liquidity data, extract
# trading pairs that exceed our prefiltering threshold
logger.info("Downloading/opening liquidity dataset")
liquidity_df = client.fetch_all_liquidity_samples(time_bucket).to_pandas()
logger.info(f"Filtering out liquidity for chain {chain_id.name}")
liquidity_df = liquidity_df.loc[liquidity_df.pair_id.isin(our_chain_pair_ids)]
liquidity_df = liquidity_df.loc[
liquidity_df.timestamp > datetime.datetime(2024, 1, 1)
]
liquidity_per_pair = liquidity_df.groupby(liquidity_df.pair_id)
logger.info(
f"Chain {chain_id.name} has liquidity data for {len(liquidity_per_pair.groups)}"
)
passed_pair_ids = set()
liquidity_output_chunks = []
for pair_id, pair_df in liquidity_per_pair:
if pair_df["high"].max() > min_prefilter_liquidity:
liquidity_output_chunks.append(pair_df)
passed_pair_ids.add(pair_id)
logger.info(
f"After filtering for {min_prefilter_liquidity:,} USD min liquidity we have {len(passed_pair_ids)} pairs"
)
liquidity_df = pd.concat(liquidity_output_chunks)
# liquidity_out_df.to_parquet(liquidity_output_fname)
# logger.info(
# f"Wrote {liquidity_output_fname}, {liquidity_output_fname.stat().st_size:,} bytes"
# )
logger.info("Downloading/opening OHLCV dataset")
price_df = client.fetch_all_candles(time_bucket).to_pandas()
price_df = price_df.loc[price_df.pair_id.isin(passed_pair_ids)]
logger.info(f"Price df len before index: {len(price_df)}")
logger.info(f"Price df columns: {price_df.columns} and index: {price_df.index}")
price_df.index = pd.DatetimeIndex(price_df.timestamp)
logger.info(f"type of index after conversion: {type(price_df.index)}")
logger.info(f"Price df len: {len(price_df)}")
# price_df.to_parquet('price_output_fname')
# logger.info(
# f"Wrote {price_output_fname}, {price_output_fname.stat().st_size:,} bytes"
# )
# FILTER MORE
NUM_TOKENS = 50 # TODO change this back to 200
custom_data_df, df_trend, df_stablecoins = get_ethereum_trend_data()
custom_data_df.index = pd.DatetimeIndex(custom_data_df["date"])
custom_data_df["contract_address"] = custom_data_df[
"contract_address"
].str.lower() # Framework operates with lowercased addresses everywehre
df_trend["contract_address"] = df_trend[
"contract_address"
].str.lower() # Framework operates with lowercased addresses everywehre
custom_data_df.sort_index(ascending=True)
custom_data_no_dups = custom_data_df.drop_duplicates(
subset=["coin_id", "symbol"], keep="last"
)[["coin_id", "symbol", "contract_address", "platform_name", "total_volume"]]
start = custom_data_df.index[0]
end = custom_data_df.index[-1]
csv_token_list = list(custom_data_df.contract_address.unique())
logger.info(
f"CSV contains data for {len(csv_token_list)} tokens, time range {start} - {end}"
)
# Remove Stablecoins
custom_data_no_dups = custom_data_no_dups[
~custom_data_no_dups.coin_id.isin(df_stablecoins.coin_id.unique())
]
csv_token_list_backtest = custom_data_no_dups.sort_values(
by="total_volume", ascending=False
)[["symbol", "contract_address"]].iloc[:NUM_TOKENS]
logger.info(
f"Pre-selecting the following tokens and contract addresses to backtest {csv_token_list_backtest}"
)
erc20_addresses_avoid = ["0xA3c322Ad15218fBFAEd26bA7f616249f7705D945".lower()]
ethereum_erc20_address_list = csv_token_list_backtest["contract_address"].tolist()
logger.info(
f"Before filtering for avoid addresses we have {len(ethereum_erc20_address_list)} addresses"
)
desired_trading_addresses = set(
[
address
for address in set(ethereum_erc20_address_list)
if address.lower() not in erc20_addresses_avoid
]
)
# Move logic from create_trading_universe here
SUPPORTED_DEXES = {"uniswap-v3", "uniswap-v2", "sushi"}
exchange_universe = client.fetch_exchange_universe()
exchange_universe = exchange_universe.limit_to_chains(
{Parameters.chain_id}
).limit_to_slugs(SUPPORTED_DEXES)
pairs_df = client.fetch_pair_universe().to_pandas()
logger.info(
f"Prefilter data contains {len(liquidity_df):,} liquidity samples dn {len(price_df):,} OHLCV candles"
)
if execution_context.mode.is_live_trading():
# for live trading we only need the last 60 days
now = datetime.datetime.utcnow()
start_at = now - datetime.timedelta(days=60)
end_at = now
price_df = price_df.loc[price_df.timestamp >= start_at]
liquidity_df = liquidity_df.loc[liquidity_df.timestamp >= start_at]
elif execution_context.mode.is_backtesting():
start_at = Parameters.backtest_start
end_at = Parameters.backtest_end
price_df = price_df.loc[
(price_df.timestamp >= start_at) & (price_df.timestamp <= end_at)
]
liquidity_df = liquidity_df.loc[
(liquidity_df.timestamp >= start_at) & (liquidity_df.timestamp <= end_at)
]
else:
raise RuntimeError(f"Unknown execution mode {execution_context.mode}")
# Prefilter for more liquidity conditions
liquidity_per_pair = liquidity_df.groupby(liquidity_df.pair_id)
logger.info(
f"Chain {chain_id.name} has liquidity data for {len(liquidity_per_pair.groups)}"
)
# Get the date 30 days ago
thirty_days_ago = end_at - datetime.timedelta(days=30)
# Create a subset of the data for the last 30 days, in live trading this makes more sense
liquidity_last_30_days = liquidity_df[liquidity_df.timestamp > thirty_days_ago]
liquidity_per_pair_last_30d = liquidity_last_30_days.groupby("pair_id")
passed_pair_ids = set()
for pair_id, pair_df in liquidity_per_pair_last_30d:
# Check the maximum high liquidity in the last 30 days
if pair_df["high"].max() > Parameters.min_liquidity_threshold:
passed_pair_ids.add(pair_id)
pairs_df = pairs_df.loc[pairs_df.pair_id.isin(passed_pair_ids)]
logger.info(
f"After liquidity filter {Parameters.min_liquidity_threshold:,} USD we have {len(pairs_df)} trading pairs"
)
logger.info(f"Columns of pairs df: {pairs_df.columns}")
allowed_exchange_ids = set(exchange_universe.exchanges.keys())
pairs_df = pairs_df.loc[pairs_df.exchange_id.isin(allowed_exchange_ids)]
logger.info(f"After DEX filter we have {len(pairs_df)} trading pairs")
# Store reference USDC ETH pair so we have an example pair with USDC as quote token for reserve asset
# Get ETH-USDC pairs from all major DEXes
eth_usdc_addresses = [
"0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc", # Uniswap V2
"0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640", # Uniswap V3
"0x397ff1542f962076d0bfe58ea045ffa2d347aca0", # Sushiswap
]
logger.info(f"ETH-USDC pairs: {eth_usdc_addresses}")
ref_usdc_pairs = pairs_df[
pairs_df["address"].isin([addr.lower() for addr in eth_usdc_addresses])
].copy()
logger.info(f"Ref USDC pairs: {ref_usdc_pairs.head()}")
# logger.info(f"len ref usdc pairs: {len(ref_usdc_pairs)}")
# Pairs pre-processing
pairs_df = add_base_quote_address_columns(pairs_df)
logger.info(
f"before filtering for desired trading addresses we have {len(pairs_df)} trading pairs"
)
logger.info(
f"Trying to filter for {len(desired_trading_addresses)} desired trading addresses"
)
pairs_df = pairs_df.loc[
(pairs_df["base_token_address"].isin(desired_trading_addresses))
& (pairs_df["chain_id"] == chain_id)
]
logger.info(
f"After desired trading addresses filter we have {len(pairs_df)} trading pairs"
)
# Retrofit TokenSniffer data
pairs_df = load_extra_metadata(
pairs_df,
client=client,
)
pairs_df = filter_pairs_by_risk(
pairs_df,
risk_threshold=Parameters.token_risk_threshold,
max_buy_tax=Parameters.max_buy_tax,
max_sell_tax=Parameters.max_sell_tax,
risk_traits=None,
)
logger.info(
f"Pairs found in pairs_df after risk filter: {pairs_df['base_token_symbol'].unique()}"
)
logger.info(
f"Pairs found after the risk filter: {pairs_df[['base_token_symbol', 'quote_token_symbol', 'address']].values}"
)
# Do cross-section of tokens from custom data
pairs_df = filter_for_stablecoins(
pairs_df, StablecoinFilteringMode.only_volatile_pairs
)
logger.info(
f"After custom data ERC-20 token address cross section filter we have {len(pairs_df)} trading pairs"
)
# We want to keep only USDC or WETH quoted pairs
pairs_df = filter_for_quote_tokens(
pairs_df,
{
"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48".lower(),
"0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2".lower(),
},
)
logger.info(f"After quote token filter we have {len(pairs_df)} trading pairs")
# Add back USDC pair that have ref pair in the pairs universe
logger.info(f"Before deduplication we have {len(pairs_df)} trading pairs")
pairs_df = deduplicate_pairs_by_volume(pairs_df)
logger.info(f"After deduplication we have {len(pairs_df)} trading pairs")
pairs_df = pd.concat([pairs_df, ref_usdc_pairs]).drop_duplicates(subset=["pair_id"])
logger.info(f"After adding ref USDC pairs we have {len(pairs_df)} trading pairs")
logger.info(
f"Pairs found in pairs_df after risk filter: {pairs_df['base_token_symbol'].unique()}"
)
logger.info(
f"Pairs being passed in the df values: {pairs_df[['base_token_symbol', 'quote_token_symbol', 'address']].values}"
)
return pairs_df, price_df, liquidity_df, exchange_universe
def create_trading_universe(
timestamp: datetime.datetime,
client: Client,
execution_context: ExecutionContext,
universe_options: UniverseOptions,
) -> TradingStrategyUniverse:
"""Create the trading universe."""
USDC = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48".lower()
pairs_df, price_df, liquidity_df, exchange_universe = prefilter_data(
client, execution_context
)
if execution_context.mode.is_backtesting():
# Resample strategy decision candles to daily
price_df["timestamp"] = pd.to_datetime(price_df["timestamp"])
daily_candles = resample_multi_pair(price_df, Parameters.candle_time_bucket)
daily_candles["timestamp"] = daily_candles.index
logger.info(
f"After downsampling we have {len(daily_candles)} OHLCV candles and {len(liquidity_df)} liquidity samples"
)
candle_universe = GroupedCandleUniverse(
daily_candles,
time_bucket=Parameters.candle_time_bucket,
forward_fill=True, # Forward will should make sure we can always calculate RSI, other indicators
)
liquidity_universe = GroupedLiquidityUniverse(liquidity_df)
# The final trading pair universe contains metadata only for pairs that passed
# our filters
pairs_universe = PandasPairUniverse(
pairs_df,
exchange_universe=exchange_universe,
)
stop_loss_candle_universe = GroupedCandleUniverse(price_df)
data_universe = Universe(
time_bucket=Parameters.candle_time_bucket,
liquidity_time_bucket=Parameters.candle_time_bucket,
exchange_universe=exchange_universe,
pairs=pairs_universe,
candles=candle_universe,
liquidity=liquidity_universe,
chains={Parameters.chain_id},
forward_filled=True,
)
reserve_asset = translate_token(pairs_universe.get_token(USDC))
strategy_universe = TradingStrategyUniverse(
data_universe=data_universe,
backtest_stop_loss_time_bucket=Parameters.stop_loss_time_bucket,
backtest_stop_loss_candles=stop_loss_candle_universe,
reserve_assets=[reserve_asset],
)
elif execution_context.mode.is_live_trading():
dataset = load_partial_data(
client,
execution_context=execution_context,
time_bucket=Parameters.candle_time_bucket,
pairs=pairs_df,
universe_options=universe_options,
stop_loss_time_bucket=Parameters.stop_loss_time_bucket,
# lending_reserves=lending_reserves,
required_history_period=Parameters.required_history_period,
liquidity=True,
)
strategy_universe = TradingStrategyUniverse.create_from_dataset(
dataset,
forward_fill=True,
reserve_asset=USDC,
)
return strategy_universe
@cachetools.cached(
cache=cachetools.TTLCache(ttl=60 * 60 * 2, maxsize=1000),
key=lambda s, p: cachetools.keys.hashkey("get_broken_pairs"),
)
def get_broken_pairs(strategy_universe: TradingStrategyUniverse, parameters) -> set:
# Run some extra sanity check for small cap tokens
broken_trading_pairs = set()
pairs_to_avoid = [87449]
for pair in strategy_universe.iterate_pairs():
logger.debug(f"Readable format pair {pair.get_human_description()}")
logger.debug(
f"Checking pair: {pair.base.token_symbol} with address {pair.base.address} and quote token {pair.quote.token_symbol} with address {pair.quote.address}"
)
reason = strategy_universe.get_trading_broken_reason(
pair,
min_candles_required=10,
min_price=parameters.min_price,
max_price=parameters.max_price,
)
if pair.internal_id in pairs_to_avoid:
broken_trading_pairs.add(pair)
if reason:
logger.debug(
f"FAIL: {pair} with base token {pair.base.address} may be problematic: {reason}"
)
broken_trading_pairs.add(pair)
else:
logger.debug(f"OK: {pair} included in the backtest")
logger.info(
f"Total {len(broken_trading_pairs)} broken trading pairs detected, having {strategy_universe.get_pair_count() - len(broken_trading_pairs)} good pairs left to trade"
)
return broken_trading_pairs
def is_acceptable(
indicators: StrategyInputIndicators,
parameters: StrategyParameters,
pair: TradingPairIdentifier,
) -> bool:
"""Check the pair for risk acceptance
:return:
True if we should trade this pair
"""
broken_trading_pairs = get_broken_pairs(indicators.strategy_universe, parameters)
if pair in broken_trading_pairs:
# Don't even bother to try trade this
return False
avoid_backtesting_tokens = {
# Trading jsut stops (though there is liq left)
# https://tradingstrategy.ai/trading-view/ethereum/uniswap-v3/id-usdc-fee-30
"PEOPLE",
"WBTC",
"USDT",
"USDC",
}
if pair.base.token_symbol in avoid_backtesting_tokens:
# Manually blacklisted toen for this backtest
return False
# Pair does not quality yet due to low liquidity
liquidity = indicators.get_tvl(pair=pair)
if liquidity is None or liquidity <= parameters.min_liquidity_threshold:
return False
volume = indicators.get_price(pair, column="volume")
close_price = indicators.get_price(pair=pair)
if close_price is None or volume is None:
return False
volume_adjusted = volume / close_price
logger.debug(f"Volume {volume} and fixed volume is {volume_adjusted}")
if volume_adjusted < parameters.min_volume:
return False
return True
@cachetools.cached(cache=cachetools.TTLCache(ttl=60 * 60 * 2, maxsize=1000))
def get_custom_data_group():
storage_options = get_google_storage_credentials()
logger.info("Getting custom data group")
df_platforms = pd.read_csv(
"gs://taraxa-research/coingecko_data_pipeline/df_platforms.csv",
storage_options=storage_options,
)
df_trend = pd.read_parquet(
"gs://taraxa-research/trend_spotting/trends_v1.parquet",
storage_options=storage_options,
)
df_trend.reset_index(inplace=True)
df_trend_eth = df_trend.merge(
df_platforms[df_platforms["platform_name"] == "Ethereum"],
on="coin_id",
how="inner",
)
df_trend_eth = df_trend_eth.rename(
columns={"coin": "symbol", "coin_id_count": "social_mentions"}
)
df_trend_eth["contract_address"] = df_trend_eth[
"contract_address"
].str.lower() # Framework operates with lowercased addresses everywehre
# Create per-pair DataFrame group by
custom_data_group = (
df_trend_eth.set_index("date").sort_index().groupby("contract_address")
)
return custom_data_group
def add_metric(pair: TradingPairIdentifier, metric_name: str) -> pd.Series:
"""Add a specific metric to the dataset."""
custom_data_group = get_custom_data_group()
try:
contract_address = pair.base.address
per_pair = custom_data_group.get_group(contract_address)
logger.info(f"Per pair: {per_pair.tail()}")
per_pair_series = per_pair[metric_name]
metric_series = per_pair_series[
~per_pair_series.index.duplicated(keep="last")
].rename(metric_name)
full_date_range = pd.date_range(
start=metric_series.index.min(), end=metric_series.index.max(), freq="D"
)
metric_series = metric_series.reindex(full_date_range)
metric_series = metric_series.ffill()
metric_series.sort_index(inplace=True)
return metric_series
except Exception as e:
logger.info(f"Error adding metric {metric_name} for pair {pair}: {str(e)}")
return pd.Series(dtype="float64", index=pd.DatetimeIndex([]))
def calculate_metric_emas(
pair: TradingPairIdentifier, metric_name: str, short_length: int, long_length: int
) -> pd.DataFrame:
"""Calculate short and long EMAs for a specific metric."""
metric_series = add_metric(pair, metric_name)
# logger.info(f"Metric series: {metric_series.tail()}")
metric_series = metric_series.interpolate(
method="linear", limit_direction="forward"
)
logger.info(f"Metric series after interpolation: {metric_series.tail()}")
ema_short = (
metric_series.ewm(span=short_length, adjust=False)
.mean()
.rename(f"{metric_name}_ema_short")
)
# logger.info(f"EMA short: {ema_short.tail()}")
ema_long = (
metric_series.ewm(span=long_length, adjust=False)
.mean()
.rename(f"{metric_name}_ema_long")
)
emas_df = pd.concat([ema_short, ema_long], axis=1)
# logger.info(f"EMA df: {emas_df.tail()}")
return emas_df
def momentum(close, momentum_lookback_bars) -> pd.Series:
"""Calculate momentum series to be used as a signal.
This indicator is later processed in decide_trades() to a weighted alpha signal.
:param momentum_lookback_bars:
Calculate returns based on this many bars looked back
"""
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.shift.html#pandas.DataFrame.shift
start_close = close.shift(momentum_lookback_bars)
momentum = (close - start_close) / start_close
return momentum
def create_indicators(
timestamp: datetime.datetime | None,
parameters: StrategyParameters,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext,
):
logger.info("Creating indicators")
indicators = IndicatorSet()
indicators.add(
"momentum",
momentum,
{"momentum_lookback_bars": parameters.momentum_lookback_bars},
IndicatorSource.close_price,
)
# Social Metrics
# Add original value indicators
social_metrics = ["social_mentions"]
for metric in social_metrics:
logger.info(f"Adding metric: {metric}")
indicators.add(
metric,
add_metric,
{"metric_name": metric},
IndicatorSource.external_per_pair,
)
# Add EMA indicators
indicators.add(
f"{metric}_emas",
calculate_metric_emas,
{
"metric_name": metric,
"short_length": parameters.social_ema_short_length,
"long_length": parameters.social_ema_long_length,
},
IndicatorSource.external_per_pair,
)
return indicators
def decide_trades(
input: StrategyInput,
) -> list[TradeExecution]:
logger.info("Starting decide_trades")
#
# Decision cycle setup.
# Read all variables we are going to use for the decisions.
#
parameters = input.parameters
position_manager = input.get_position_manager()
state = input.state
timestamp = input.timestamp
indicators = input.indicators
strategy_universe = input.strategy_universe
# Initiate logging vars
equity_before = state.portfolio.get_total_equity()
trade_decision_report = {}
num_signals_accepted = 0
num_signals_rejected = 0
signals_created = []
signals_rejected = []
rejected_trades_lack_of_liquidity = []
cash = position_manager.get_current_cash()
total_equity = state.portfolio.get_total_equity()
if total_equity > 10_000_000:
position_valuations = "\n".join(
[
f"{p} (token {p.pair.base.address}): {p.get_value()}"
for p in state.portfolio.open_positions.values()
]
)
raise RuntimeError(
f"Portfolio total equity exceeded 1,000,000 USD. Some broken math likely happened. Total equity is {total_equity} USD.\nOpen positions:\n{position_valuations}"
)
#
# Trading logic
#
# We do some extra checks here as we are trading low quality
# low cap tokens which often have outright malicious data for trading.
#
trades = []
# Enable trailing stop loss after we reach the profit taking level
#
for position in state.portfolio.open_positions.values():
if position.trailing_stop_loss_pct is None:
close_price = indicators.get_price(pair=position.pair)
if (
close_price
and close_price
>= position.get_opening_price()
* parameters.trailing_stop_loss_activation_level
):
position.trailing_stop_loss_pct = parameters.trailing_stop_loss_pct
elif position.stop_loss is None:
position.stop_loss = parameters.stop_loss_pct
size_risk_model = USDTVLSizeRiskModel(
pricing_model=input.pricing_model,
per_position_cap=parameters.per_position_cap_of_pool, # This is how much % by all pool TVL we can allocate for a position
missing_tvl_placeholder_usd=100_000, # Placeholder for missing TVL data until we get the data off the chain
)
# # Prefetch TVLs for all pairs in a single go
# if input.execution_context.mode.is_live_trading():
# all_pairs = strategy_universe.iterate_pairs()
# pair_tvls = fetch_quote_token_tvls(
# input.web3,
# input.strategy_universe,
# all_pairs,
# )
# else:
# pair_tvls = None
for pair in strategy_universe.iterate_pairs():
if not is_acceptable(indicators, parameters, pair):
# Skip this pair for the risk management
continue
position_for_pair = state.portfolio.get_open_position_for_pair(pair)
# Extract Social indicators here
try:
social_mentions = indicators.get_indicator_value(
"social_mentions",
pair=pair,
index=-2,
data_delay_tolerance=pd.Timedelta(days=15),
)
logger.info(
f"Social mentions for pair {pair.base.token_symbol}: {social_mentions}"
)
except IndicatorDataNotFoundWithinDataTolerance:
logger.warning(
f"Social mentions data not found within tolerance for pair {pair}. "
"Skipping this asset."
)
continue
try:
ema_short = indicators.get_indicator_value(
"social_mentions_emas",
pair=pair,
column=f"social_mentions_ema_short",
index=-2,
data_delay_tolerance=pd.Timedelta(days=15),
)
except IndicatorDataNotFoundWithinDataTolerance:
logger.warning(
f"EMA short data not found within tolerance for pair {pair}. "
"Skipping this asset."
)
continue
try:
ema_long = indicators.get_indicator_value(
"social_mentions_emas",
pair=pair,
column=f"social_mentions_ema_long",
index=-2,
data_delay_tolerance=pd.Timedelta(days=15),
)
except IndicatorDataNotFoundWithinDataTolerance:
logger.warning(
f"EMA long data not found within tolerance for pair {pair}. "
"Skipping this asset."
)
continue
ema_short_series = indicators.get_indicator_series(
"social_mentions_emas", pair=pair, column=f"social_mentions_ema_short"
)
ema_long_series = indicators.get_indicator_series(
"social_mentions_emas", pair=pair, column=f"social_mentions_ema_long"
)
# Volume Based Metrics
volume = indicators.get_price(column="volume", pair=pair)
last_close_price = indicators.get_price(
pair=pair
) # , data_lag_tolerance=pd.Timedelta(days=360))
momentum = indicators.get_indicator_value("momentum", pair=pair)
tvl = indicators.get_tvl(pair=pair)
crossover_occurred = False
try:
crossover, crossover_index = contains_cross_over(
ema_short_series,
ema_long_series,
lookback_period=parameters.cross_over_period,
must_return_index=True,
)
crossover_occurred = crossover and (
crossover_index >= -parameters.cross_over_period
)
except Exception as e:
crossover = None
crossover_occurred = False
logger.trade("Cross over did not occur due to exception: %s", e)
#
# Visualisations
#
if input.is_visualisation_enabled():
visualisation = (
state.visualisation
) # Helper class to visualise strategy output
visualisation.plot_indicator(
timestamp,
f"Social mentions {pair.base}",
PlotKind.technical_indicator_detached,
social_mentions,
pair=pair,
)
visualisation.plot_indicator(
timestamp,
f"Social mentions EMA {pair.base}",
PlotKind.technical_indicator_detached,
ema_short,
pair=pair,
)
visualisation.plot_indicator(
timestamp,
f"Social mentions Long {pair.base}",
PlotKind.technical_indicator_overlay_on_detached,
ema_long,
pair=pair,
detached_overlay_name=f"Social mentions EMA {pair.base}",
)
visualisation.plot_indicator(
timestamp,
f"Momentum {pair.base}",
PlotKind.technical_indicator_detached,
momentum,
pair=pair,
)
# Add enhanced logging
trade_decision_report[pair.base.token_symbol] = {
"pair": pair.base.token_symbol,
"momentum": momentum,
"social_mentions": social_mentions,
"crossover_occurred": crossover_occurred,
"momentum_above_threshold": (
momentum >= parameters.minimum_mometum_threshold
if momentum is not None
else False
),
"social_mention_min_satisfied": (
ema_short >= parameters.social_ma_min
if ema_short is not None
else False
),
"all_conditions_met": (
crossover_occurred
and momentum is not None
and momentum >= parameters.minimum_mometum_threshold
and ema_short is not None
and ema_short >= parameters.social_ma_min
),
}
if trade_decision_report[pair.base.token_symbol]["all_conditions_met"]:
logger.info(
f"Accepted signal for {pair.base.token_symbol} current num_signals_accepted: {num_signals_accepted}"
)
num_signals_accepted += 1
signals_created.append(trade_decision_report[pair.base.token_symbol])
else:
logger.info(
f"Rejected signal for {pair.base.token_symbol} current num_signals_rejected: {num_signals_rejected}"
)
num_signals_rejected += 1
signals_rejected.append(trade_decision_report[pair.base.token_symbol])
# Check if we are too early in the backtesting to have enough data to calculate indicators
# if None in (volume, bb_upper_interactions_social, bb_upper_interactions_social, bb_upper_sentiment_social, social_mentions, interactions, sentiment, sma):
if None in (volume, social_mentions, tvl): # , momentum):
continue
# Make sure you don't trade the same base token in current traded positions
open_positions = state.portfolio.open_positions.values()
base_token_address = pair.base.address
quote_token_address = pair.quote.address
# Check if there's already an open position with the same quote token
existing_position_with_quote = any(
pos.pair.base.address == base_token_address for pos in open_positions
)
# If there's already an open position with the same quote token, skip this pair
if existing_position_with_quote:
continue
if (
len(state.portfolio.open_positions) < parameters.max_assets
and state.portfolio.get_open_position_for_pair(pair) is None
):
should_open_position = False
if momentum is None:
if crossover_occurred and ema_short >= parameters.social_ma_min:
should_open_position = True
elif (
crossover_occurred
and (momentum >= parameters.minimum_mometum_threshold)
and ema_short >= parameters.social_ma_min
):
should_open_position = True
# there is signal to open a position
if should_open_position:
size_risk = size_risk_model.get_acceptable_size_for_position(
timestamp=timestamp,
pair=pair,
asked_value=cash * parameters.allocation,
)
buy_amount = size_risk.accepted_size
logger.trade(
"Position size risk, pair: %s, asked: %s, accepted: %s, diagnostics: %s",
pair,
size_risk.asked_size,
buy_amount,
size_risk.diagnostics_data,
)
# if pair_tvls:
# tvl = pair_tvls[pair]
# pool_capped_usd = tvl * parameters.per_position_cap_of_pool
# if buy_amount > pool_capped_usd:
# logger.trade(
# "Pool quote token TVL %f USD, our cap %f USD, capping trade amount, cap percent %f",
# tvl,
# pool_capped_usd,
# parameters.per_position_cap_of_pool,
# )
# buy_amount = pool_capped_usd
if buy_amount >= parameters.min_trade_size_usd:
logger.trade(
"Opening position for %s with %f USDC", pair, buy_amount
)
trades += position_manager.open_spot(
pair,
value=buy_amount,
stop_loss_pct=parameters.stop_loss_pct,
)
else:
rejected_trades_lack_of_liquidity.append(
f"Skipped {trade_decision_report[pair.base.token_symbol]} due to trade size {buy_amount} USD, because it is below our minimum threshold {parameters.min_trade_size_usd} USD"
)
logger.trade(
"Skipping trade size %f USD, because it is below our minimum threshold %f USD",
buy_amount,
parameters.min_trade_size_usd,
)
equity_after = state.portfolio.get_total_equity()
report = dedent_any(
f"""
Trades decided: {len(trades)}
Pairs total: {strategy_universe.data_universe.pairs.get_count()}
Num Signals accepted: {num_signals_accepted}
Num Signals rejected: {num_signals_rejected}
Rejected trades due to lack of liquidity: {rejected_trades_lack_of_liquidity}
Total equity before: {equity_before:,.2f} USD
Total equity after: {equity_after:,.2f} USD
Cash: {position_manager.get_current_cash():,.2f} USD
Positions: {state.portfolio.open_positions.values()}
Allocated to signals: {equity_after - equity_before:,.2f} USD
Signals created: {signals_created}
Signals rejected: {signals_rejected}
"""
)
state.visualisation.add_message(timestamp, report)
return trades
#
# Strategy metadata.
#
# Displayed in the user interface.
#
tags = {StrategyTag.beta, StrategyTag.live}
name = "ETH Memecoin Social Trend Spotter"
short_description = "Social-data driven memecoin trading strategy on Ethereum"
icon = ""
long_description = """
# Strategy description
This strategy leverages TrendMoon social data and market metrics to identify and trade emerging memecoin opportunities on Ethereum. By aggregating social attention around each individual token, the strategy combines data on social mentions, mindshare, and sentiment to detect new trends forming in the market. This approach works particularly well for social tokens and memecoins, as their value is heavily driven by social activity.
The strategy uses a combination of social metrics and additional market indicators, such as trading volume and momentum, to determine optimal entry points. By identifying periods of rising social interest and confirming momentum through market data, it captures upside potential while implementing risk management measures to protect against downside risks.
- Analyzes social mentions, mindshare, and sentiment to identify tokens gaining traction.
- Confirms entry signals with volume and price momentum indicators to validate trends.
- Implements robust risk management with stop-losses and trailing stops to secure profits and limit losses.
- Targets high-potential memecoins with sufficient liquidity and trading volume.
- Makes trading decisions on a daily basis, with automated execution and risk safeguards.
This strategy is designed to perform well in times of Bitcoin bullish momentum and even during sideways markets when memecoins show positive trends. It is less effective in downturns, but risk management measures help mitigate losses.
**Past performance is not indicative of future results**.
## Assets and trading venues
- Trades only on the spot market.
- Focuses on memecoin tokens on Ethereum.
- Maintains reserves in USDC stablecoin.
- Trading is executed on major DEXes (e.g., Uniswap, Sushiswap).
- Strategy decision cycle involves daily rebalancing.
## Risk
Key risk management features:
- No leverage used.
- Strict liquidity requirements.
- Multi-layer risk assessment to closely monitor:
- Technical (price-based stops).
- On-chain (token contract risk scoring).
- Tax analysis (buy/sell tax limits).
- Position size limits and dynamic position sizing according to liquidity.
- Automated stop-loss monitoring on hourly timeframes.
The strategy performs best in bull markets or sideways markets with active memecoin trading. Performance may be reduced in bear markets, though risk management features help protect capital.
## Backtesting
The backtesting was performed using historical memecoin data from Ethereum DEXes between 2023-2024.
- [See backtesting results](./backtest)
- [Read more about what is backtesting](https://tradingstrategy.ai/glossary/backtest).
- On-chain risk scoring was not included in backtesting results.
The backtesting period covered both bull and bear markets. The backtesting period saw one bull market memecoin rally from bear market lows that is unlikely to repeat in the same magnitude for the assets we trade. As such, past performance should not be used as a predictor of future returns. Backtest results have inherent limitations, and there will be variance in live performance.
## Profit
The backtested results indicate **684.39%** estimated yearly profit ([CAGR](https://tradingstrategy.ai/glossary/compound-annual-growth-rate-cagr)).
This return is based on the strategy's ability to capture upside from emerging memecoins while mitigating severe drawdowns compared to simple buy and hold.
## Benchmark
For the same backtesting period, here are some benchmark performance comparisons:
| | CAGR | Maximum drawdown | Sharpe |
| ---------------------------- | ------- | ---------------- | ------ |
| This strategy | 684.39% | -23.07% | 2.11 |
| SP500 (20 years) | 11% | -33% | 0.72 |
| Bitcoin (backtesting period) | 76% | -76% | 1.17 |
| Ether (backtesting period) | 29.61% | -45.27% | 0.95 |
Sources:
- [Our strategy](./backtest)
- [Buy and hold BTC](./backtest)
- [Buy and hold ETH](./backtest)
- [SP500 stock index](https://curvo.eu/backtest/en/portfolio/s-p-500--NoIgygZACgBArABgSANMUBJAokgQnXAWQCUEAOAdlQEYBdeoA?config=%7B%22periodStart%22%3A%222004-02%22%7D)
## Trading frequency
The strategy is a relatively low-frequency strategy compared to other memecoin traders.
- Rebalances daily.
- Average duration of winning positions: **9 days 8 hours**.
- Average duration of losing positions: **1 day 22 hours**.
- Average positions per day: **0.33**.
## Portfolio management
- Holds a maximum of 7 concurrent positions.
- Allocates **14.2%** of the portfolio per position.
- Positions are sized based on available liquidity and overall market conditions.
- Maintains reserves in USDC as a safety net for opportunities or in case of risk events.
## Robustness
This strategy has not undergone extensive robustness testing across different market cycles, but it has been stress tested against recent data with significant market volatility, from the start of 2023.
## Updates
This strategy is one of the early social-data-driven memecoin strategies on Ethereum.
As the market evolves, it is expected that newer versions will be developed. Stay updated by [following us for updates](https://tradingstrategy.ai/community) to make sure you are trading with the latest and most optimized strategy.
## Further information
- Strategy uses **TrendMoon social data integration** for social trend detection.
- Join our [Discord community chat](https://discord.gg/cU66eMd8) for questions or more information.
- Check out our [Telegram channel](https://t.me/TrendMoon) for the latest discussions and insights.
- Visit our [website](https://trendmoon.ai) to learn more about TrendMoon.
- For all our resources, check our [link tree](https://t.co/7FJ4N8VNtB).
- Automated execution with no manual intervention required.
- Strict filters for excluding high-risk tokens.
"""