"""Vault data sideloading.
To repackage the vault bundle:
.. code-block:: shell
# Copy scanned vault bundles to Python package data
./scripts/repackage-vault-data.sh
"""
from pathlib import Path
from typing import Iterable
import pandas as pd
import zstandard
from tradingstrategy.chain import ChainId
from tradingstrategy.utils.flexible_pickle import flexible_load, filter_broken_enum_values
from tradingstrategy.exchange import Exchange
from tradingstrategy.types import NonChecksummedAddress
from tradingstrategy.utils.groupeduniverse import resample_candles_multiple_pairs
from tradingstrategy.vault import VaultUniverse, Vault, VaultMetadata, _derive_pair_id_from_address
#: Default URL for the vault metadata JSON blob
#:
#: Generated by eth_defi's ``calculate_lifetime_metrics()`` and uploaded periodically.
VAULT_JSON_BLOB_URL = "https://top-defi-vaults.tradingstrategy.ai/top_vaults_by_chain.json"
#: Path to the bundled vault database
#:
#: To regenerate the bundle: `zstd -f -o tradingstrategy/alternative_data/vault-db.pickle.zstd ~/.tradingstrategy/vaults/vault-db.pickle`
DEFAULT_VAULT_BUNDLE = Path(__file__).parent / ".." / "data_bundles" / "vault-metadata-db.pickle.zstd"
#: Path to the example vault price data
DEFAULT_VAULT_PRICE_BUNDLE = Path(__file__).parent / ".." / "data_bundles" / "vault-prices.parquet"
#: Cached loaded vault universe from our defaut bundle
_cached_vault_universe: dict[Path, VaultUniverse] = {}
[docs]def load_vault_database(
path: Path | None = None,
filter_bad_entries: bool = True,
) -> VaultUniverse:
"""Load pickled vault metadata database generated with an offline script.
- For sideloading vault data
- Normalises vault data in a good documented format
- For the generation `see this tutorial <https://web3-ethereum-defi.readthedocs.io/tutorials/erc-4626-scan-prices.html>`__
:param path:
Path to the pickle file.
If not given use the default location.
Can be zstd compressed with .zstd suffix.
"""
from eth_defi.vault.vaultdb import VaultDatabase
if path is None:
path = DEFAULT_VAULT_BUNDLE
assert path.exists(), f"No vault file: {path}"
existing = _cached_vault_universe.get(path)
if existing is not None:
return existing
vault_db: VaultDatabase
if path.suffix == ".zstd":
with zstandard.open(path, "rb") as inp:
vault_db = flexible_load(inp)
else:
# Normal pickle
with path.open("rb") as f:
vault_db = flexible_load(f)
vaults = []
# data = {
# "Symbol": vault.symbol,
# "Name": vault.name,
# "Address": detection.address,
# "Denomination": vault.denomination_token.symbol if vault.denomination_token else None,
# "NAV": total_assets,
# "Protocol": get_vault_protocol_name(detection.features),
# "Mgmt fee": management_fee,
# "Perf fee": performance_fee,
# "Shares": total_supply,
# "First seen": detection.first_seen_at,
# "_detection_data": detection,
# "_denomination_token": denomination_token,
# "_share_token": vault.share_token.export() if vault.share_token else None,
# }
def _safe_get(d: dict, key: str, key2: str, default=None):
try:
return (d.get(key, {}) or {}).get(key2, default)
except Exception:
return default
for address, entry in vault_db.items():
try:
detection: "eth_defi.erc_4626.core.ERC4262VaultDetection" = entry["_detection_data"]
if filter_bad_entries:
if (not entry["Name"]) or (not entry["Denomination"]):
# Skip invalid entries as all other required data is missing
continue
if "unknown" in entry["Name"]:
# Skip nameless / broken entries
continue
if detection.chain < 0:
# Skip negative chain IDs (placeholder/sentinel values)
continue
try:
chain_id = ChainId(detection.chain)
except ValueError:
if filter_bad_entries:
# Skip vaults with unknown chain IDs
continue
raise
protocol_slug = entry["Protocol"].lower().replace(" ", "-")
vault = Vault(
chain_id=chain_id,
name=entry.get("Name") or "<unknown>",
token_symbol=entry["Symbol"],
vault_address=entry["Address"],
denomination_token_address=_safe_get(entry,"_denomination_token", "address"),
denomination_token_symbol=_safe_get(entry,"_denomination_token", "symbol"),
denomination_token_decimals=_safe_get(entry,"_denomination_token", "decimals"),
share_token_address=_safe_get(entry,"_share_token", "address") or entry["Address"],
share_token_symbol=_safe_get(entry,"_share_token", "symbol") or entry["Symbol"],
share_token_decimals=_safe_get(entry,"_share_token", "decimals") or 18,
protocol_name=entry["Protocol"],
protocol_slug=protocol_slug,
performance_fee=entry["Perf fee"],
management_fee=entry["Mgmt fee"],
deployed_at=detection.first_seen_at,
features=filter_broken_enum_values(detection.features),
denormalised_data_updated_at=detection.updated_at,
tvl=entry["NAV"],
issued_shares=entry["Shares"],
)
except Exception as e:
raise RuntimeError(f"Could not decode entry: {entry}") from e
vaults.append(vault)
vault_universe = VaultUniverse(vaults)
_cached_vault_universe[path] = vault_universe
return vault_universe
[docs]def convert_vaults_to_trading_pairs(
vaults: Iterable[Vault]
) -> tuple[list[Exchange], pd.DataFrame]:
"""Create a dataframe that contains vaults as trading pairs to be included alongside real trading pairs.
- Generates :py:class:`tradingstrategy.pair.PandasPairUniverse` compatible dataframe for all vaults
- Adds
:return:
Exchange data, pair dataframe tuple
"""
exchanges = list(Exchange(**v.export_as_exchange()) for v in vaults)
rows = [v.export_as_trading_pair() for v in vaults]
pairs_df = pd.DataFrame(rows).astype(Vault.get_pandas_schema())
return exchanges, pairs_df
[docs]def load_single_vault(
chain_id: ChainId,
vault_address: str,
path=DEFAULT_VAULT_BUNDLE,
) -> tuple[list[Exchange], pd.DataFrame]:
"""Load a single bundled vault entry and return as pairs data.
Example:
.. code-block:: python
vault_exchanges, vault_pairs_df = load_single_vault(ChainId.base, "0x45aa96f0b3188d47a1dafdbefce1db6b37f58216")
exchange_universe.add(vault_exchanges)
pairs_df = pd.concat([pairs_df, vault_pairs_df])
"""
vault_universe = load_vault_database(path)
vault_universe = vault_universe.limit_to_single(chain_id, vault_address)
return convert_vaults_to_trading_pairs(vault_universe.export_all_vaults())
[docs]def load_multiple_vaults(
vaults: list[tuple[ChainId, NonChecksummedAddress]] | VaultUniverse,
path=DEFAULT_VAULT_BUNDLE,
check_all_vaults_found: bool = True,
) -> tuple[list[Exchange], pd.DataFrame]:
"""Load a single bundled vault entry and return as pairs data.
Example:
.. code-block:: python
vault_exchanges, vault_pairs_df = load_multiple_vaults([ChainId.base, "0x45aa96f0b3188d47a1dafdbefce1db6b37f58216"])
exchange_universe.add(vault_exchanges)
pairs_df = pd.concat([pairs_df, vault_pairs_df])
"""
if isinstance(vaults, VaultUniverse):
vault_universe = vaults
else:
vault_universe = load_vault_database(path)
vault_universe = vault_universe.limit_to_vaults(vaults, check_all_vaults_found=check_all_vaults_found)
return convert_vaults_to_trading_pairs(vault_universe.export_all_vaults())
[docs]def create_vault_universe(
vaults: list[tuple[ChainId, NonChecksummedAddress]],
path=DEFAULT_VAULT_BUNDLE,
) -> VaultUniverse:
"""Load a single bundled vault entry and return as pairs data.
Example:
.. code-block:: python
vault_exchanges, vault_pairs_df = load_multiple_vaults([ChainId.base, "0x45aa96f0b3188d47a1dafdbefce1db6b37f58216"])
exchange_universe.add(vault_exchanges)
pairs_df = pd.concat([pairs_df, vault_pairs_df])
"""
vault_universe = load_vault_database(path)
vault_universe.limit_to_vaults(vaults)
return convert_vaults_to_trading_pairs(vault_universe.export_all_vaults())
[docs]def load_vault_price_data(
pairs_df: pd.DataFrame,
prices_path: Path=DEFAULT_VAULT_PRICE_BUNDLE,
) -> pd.DataFrame:
"""Sideload price data for vaults.
Uses Arrow-native filtering before pandas conversion to avoid
converting the entire Parquet file to pandas when only a small
subset of vaults is needed.
Schema sample:
.. code-block:: plain
schema = pa.schema([
("chain", pa.uint32()),
("address", pa.string()), # Lowercase
("block_number", pa.uint32()),
("timestamp", pa.timestamp("ms")), # s accuracy does not seem to work on rewrite
("share_price", pa.float64()),
("total_assets", pa.float64()),
("total_supply", pa.float64()),
("performance_fee", pa.float32()),
("management_fee", pa.float32()),
("errors", pa.string()),
])
:param pairs_df:
Vaults in DataFrame format as exported functions in this module.
:param path:
Load vault prices file.
If not given use the default hardcoded sample bundle.
:return:
DataFrame with the columns as defined in the schema above.
"""
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
assert isinstance(pairs_df, pd.DataFrame)
assert prices_path.exists(), f"Vault price file does not exist: {prices_path}"
vaults_to_match = {(row.chain_id, row.address) for idx, row in pairs_df.iterrows()}
assert len(vaults_to_match) < 3000, f"The vaults to load number looks too high: {len(vaults_to_match)}"
# Read as Arrow table without pandas conversion
table = pq.read_table(str(prices_path))
# Filter in Arrow by unique chains and addresses (much faster than
# converting everything to pandas first)
unique_chains = pa.array([int(c) for c, _ in vaults_to_match], type=pa.uint32())
unique_addresses = pa.array(list({a for _, a in vaults_to_match}))
table = table.filter(pc.is_in(table.column("chain"), unique_chains))
table = table.filter(pc.is_in(table.column("address"), unique_addresses))
# Convert only the filtered rows to pandas
df = table.to_pandas()
# The Arrow filters are an over-approximation (chain IN set AND address IN set),
# so apply exact (chain, address) tuple matching on the small result
mask = pd.MultiIndex.from_arrays([df["chain"], df["address"]]).isin(vaults_to_match)
df = df[mask]
return df
[docs]def convert_vault_prices_to_candles(
raw_prices_df: pd.DataFrame,
frequency: str = "1d",
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Convert vault price data to candle format.
- Partial support for price candle format to be used in backtesting
- For the format see :py:func:`load_vault_price_data`
- Only USD stablecoin denominated vaults supported for now
Example:
.. code-block: python
# Load data only for IPOR USDC vault on Base
exchanges, pairs_df = load_multiple_vaults([(ChainId.base, "0x45aa96f0b3188d47a1dafdbefce1db6b37f58216")])
vault_prices_df = load_vault_price_data(pairs_df)
assert len(vault_prices_df) == 176 # IPOR has 176 days worth of data
# Create pair universe based on the vault data
exchange_universe = ExchangeUniverse({e.exchange_id: e for e in exchanges})
pair_universe = PandasPairUniverse(pairs_df, exchange_universe=exchange_universe)
# Create price candles from vault share price scrape
candle_df, liquidity_df = convert_vault_prices_to_candles(vault_prices_df, "1h")
candle_universe = GroupedCandleUniverse(candle_df, time_bucket=TimeBucket.h1)
assert candle_universe.get_candle_count() == 4201
assert candle_universe.get_pair_count() == 1
liquidity_universe = GroupedLiquidityUniverse(liquidity_df, time_bucket=TimeBucket.h1)
assert liquidity_universe.get_sample_count() == 4201
assert liquidity_universe.get_pair_count() == 1
# Get share price as candles for a single vault
ipor_usdc = pair_universe.get_pair_by_smart_contract("0x45aa96f0b3188d47a1dafdbefce1db6b37f58216")
prices = candle_universe.get_candles_by_pair(ipor_usdc)
assert len(prices) == 4201
# Query single price sample
timestamp = pd.Timestamp("2025-04-01 04:00")
price, when = candle_universe.get_price_with_tolerance(
pair=ipor_usdc,
when=timestamp,
tolerance=pd.Timedelta("2h"),
)
assert price == pytest.approx(1.0348826417292332)
# Query TVL
liquidity, when = liquidity_universe.get_liquidity_with_tolerance(
pair_id=ipor_usdc.pair_id,
when=timestamp,
tolerance=pd.Timedelta("2h"),
)
assert liquidity == pytest.approx(1429198.98104)
:return:
Prices dataframe, TVL dataframe
"""
assert "chain" in raw_prices_df.columns, f"Got {raw_prices_df.columns}"
assert "address" in raw_prices_df.columns, f"Got {raw_prices_df.columns}"
assert frequency in ["1d", "1h"], f"Got {frequency}"
#
# Price candles
#
df = raw_prices_df
df["open"] = df["share_price"]
df["low"] = df["share_price"]
df["high"] = df["share_price"]
df["close"] = df["share_price"]
df["volume"] = 0
df["buy_volume"] = 0
df["sell_volume"] = 0
df["pair_id"] = df["address"].apply(_derive_pair_id_from_address)
# Even for daily data, we need to resample, because built-in vault price example
# data is not midnight aligned
df = _resample(df, frequency)
prices_df = df
#
# Liquidity candles
#
df = raw_prices_df
df["open"] = df["total_assets"]
df["low"] = df["total_assets"]
df["high"] = df["total_assets"]
df["close"] = df["total_assets"]
df["pair_id"] = df["address"].apply(_derive_pair_id_from_address)
# Even for daily data, we need to resample, because built-in vault price example
# data is not midnight aligned
tvl_df = _resample(df, frequency)
return prices_df, tvl_df
def _resample(df: pd.DataFrame, frequency: str) -> pd.DataFrame:
"""Multipair resample helper."""
df = resample_candles_multiple_pairs(df, frequency)
return df
def _parse_period_metrics(pm_dict: dict):
"""Parse a PeriodMetrics object from JSON dict.
:param pm_dict:
Dictionary from JSON with period metrics fields.
:return:
PeriodMetrics instance.
"""
from eth_defi.research.vault_metrics import PeriodMetrics
def _parse_timestamp(val):
if val is None:
return None
if isinstance(val, str):
# ISO format timestamp
try:
return pd.Timestamp(val)
except Exception:
return None
return val
return PeriodMetrics(
period=pm_dict.get("period"),
error_reason=pm_dict.get("error_reason"),
period_start_at=_parse_timestamp(pm_dict.get("period_start_at")),
period_end_at=_parse_timestamp(pm_dict.get("period_end_at")),
share_price_start=pm_dict.get("share_price_start"),
share_price_end=pm_dict.get("share_price_end"),
raw_samples=pm_dict.get("raw_samples", 0),
samples_start_at=_parse_timestamp(pm_dict.get("samples_start_at")),
samples_end_at=_parse_timestamp(pm_dict.get("samples_end_at")),
daily_samples=pm_dict.get("daily_samples", 0),
returns_gross=pm_dict.get("returns_gross"),
returns_net=pm_dict.get("returns_net"),
cagr_gross=pm_dict.get("cagr_gross"),
cagr_net=pm_dict.get("cagr_net"),
volatility=pm_dict.get("volatility"),
sharpe=pm_dict.get("sharpe"),
max_drawdown=pm_dict.get("max_drawdown"),
tvl_start=pm_dict.get("tvl_start"),
tvl_end=pm_dict.get("tvl_end"),
tvl_low=pm_dict.get("tvl_low"),
tvl_high=pm_dict.get("tvl_high"),
ranking_overall=pm_dict.get("ranking_overall"),
ranking_chain=pm_dict.get("ranking_chain"),
ranking_protocol=pm_dict.get("ranking_protocol"),
)
def _parse_vault_metadata(entry: dict) -> VaultMetadata:
"""Parse VaultMetadata from JSON entry.
:param entry:
JSON dict from the vault universe JSON blob.
:return:
VaultMetadata instance with all available fields populated.
"""
import datetime
def _parse_datetime(val):
if val is None:
return None
if isinstance(val, str):
try:
return datetime.datetime.fromisoformat(val.replace("Z", "+00:00"))
except Exception:
return None
return val
# Parse period_results if present
period_results = None
if entry.get("period_results"):
period_results = [_parse_period_metrics(pm) for pm in entry["period_results"]]
# Parse features - may be list of strings or list of enums
features = entry.get("features", [])
return VaultMetadata(
vault_name=entry.get("name"),
protocol_name=entry.get("protocol"),
protocol_slug=entry.get("protocol_slug"),
features=features,
performance_fee=entry.get("performance_fee"),
management_fee=entry.get("management_fee"),
lifetime_return=entry.get("lifetime_return"),
lifetime_return_net=entry.get("lifetime_return_net"),
cagr=entry.get("cagr"),
cagr_net=entry.get("cagr_net"),
three_months_return=entry.get("three_months_return"),
three_months_return_net=entry.get("three_months_return_net"),
three_months_cagr=entry.get("three_months_cagr"),
three_months_cagr_net=entry.get("three_months_cagr_net"),
volatility=entry.get("volatility") or entry.get("three_months_volatility"),
sharpe=entry.get("sharpe") or entry.get("three_months_sharpe"),
max_drawdown=entry.get("max_drawdown"),
tvl=entry.get("current_nav"),
tvl_peak=entry.get("peak_nav"),
age_years=entry.get("age"),
first_updated_at=_parse_datetime(entry.get("lifetime_start")),
last_updated_at=_parse_datetime(entry.get("lifetime_end")),
deposit_fee=entry.get("deposit_fee"),
withdrawal_fee=entry.get("withdraw_fee"),
lockup_days=entry.get("lockup"),
risk_level=entry.get("risk"),
notes=entry.get("notes"),
deposit_closed_reason=entry.get("deposit_closed_reason"),
redemption_closed_reason=entry.get("redemption_closed_reason"),
deposit_next_open=_parse_datetime(entry.get("deposit_next_open")),
redemption_next_open=_parse_datetime(entry.get("redemption_next_open")),
one_month_return=entry.get("one_month_return"),
one_month_return_net=entry.get("one_month_return_net"),
one_month_cagr=entry.get("one_month_cagr"),
one_month_cagr_net=entry.get("one_month_cagr_net"),
vault_slug=entry.get("vault_slug"),
address=entry.get("address"),
chain=entry.get("chain"),
chain_id=entry.get("chain_id"),
share_token_address=entry.get("share_token_address"),
denomination_token_address=entry.get("denomination_token_address"),
denomination=entry.get("denomination"),
share_token=entry.get("share_token"),
event_count=entry.get("event_count"),
last_share_price=entry.get("last_share_price"),
stablecoinish=entry.get("stablecoinish"),
fee_mode=entry.get("fee_mode"),
fee_internalised=entry.get("fee_internalised"),
fee_label=entry.get("fee_label"),
link=entry.get("link"),
trading_strategy_link=entry.get("trading_strategy_link"),
flags=entry.get("flags"),
period_results=period_results,
)