Source code for tradingstrategy.alternative_data.vault

"""Vault data sideloading.

To repackage the vault bundle:

.. code-block:: shell

    # Copy scanned vault bundles to Python package data
    ./scripts/repackage-vault-data.sh


"""

from pathlib import Path
from typing import Iterable

import pandas as pd
import zstandard

from tradingstrategy.chain import ChainId
from tradingstrategy.utils.flexible_pickle import flexible_load, filter_broken_enum_values
from tradingstrategy.exchange import Exchange
from tradingstrategy.types import NonChecksummedAddress
from tradingstrategy.utils.groupeduniverse import resample_candles_multiple_pairs
from tradingstrategy.vault import VaultUniverse, Vault, VaultMetadata, _derive_pair_id_from_address

#: Default URL for the vault metadata JSON blob
#:
#: Generated by eth_defi's ``calculate_lifetime_metrics()`` and uploaded periodically.
VAULT_JSON_BLOB_URL = "https://top-defi-vaults.tradingstrategy.ai/top_vaults_by_chain.json"

#: Path to the bundled vault database
#:
#: To regenerate the bundle: `zstd -f -o tradingstrategy/alternative_data/vault-db.pickle.zstd ~/.tradingstrategy/vaults/vault-db.pickle`
DEFAULT_VAULT_BUNDLE = Path(__file__).parent / ".." / "data_bundles" / "vault-metadata-db.pickle.zstd"

#: Path to the example vault price data
DEFAULT_VAULT_PRICE_BUNDLE = Path(__file__).parent / ".." / "data_bundles" / "vault-prices.parquet"


#: Cached loaded vault universe from our defaut bundle
_cached_vault_universe: dict[Path, VaultUniverse] = {}


[docs]def load_vault_database( path: Path | None = None, filter_bad_entries: bool = True, ) -> VaultUniverse: """Load pickled vault metadata database generated with an offline script. - For sideloading vault data - Normalises vault data in a good documented format - For the generation `see this tutorial <https://web3-ethereum-defi.readthedocs.io/tutorials/erc-4626-scan-prices.html>`__ :param path: Path to the pickle file. If not given use the default location. Can be zstd compressed with .zstd suffix. """ from eth_defi.vault.vaultdb import VaultDatabase if path is None: path = DEFAULT_VAULT_BUNDLE assert path.exists(), f"No vault file: {path}" existing = _cached_vault_universe.get(path) if existing is not None: return existing vault_db: VaultDatabase if path.suffix == ".zstd": with zstandard.open(path, "rb") as inp: vault_db = flexible_load(inp) else: # Normal pickle with path.open("rb") as f: vault_db = flexible_load(f) vaults = [] # data = { # "Symbol": vault.symbol, # "Name": vault.name, # "Address": detection.address, # "Denomination": vault.denomination_token.symbol if vault.denomination_token else None, # "NAV": total_assets, # "Protocol": get_vault_protocol_name(detection.features), # "Mgmt fee": management_fee, # "Perf fee": performance_fee, # "Shares": total_supply, # "First seen": detection.first_seen_at, # "_detection_data": detection, # "_denomination_token": denomination_token, # "_share_token": vault.share_token.export() if vault.share_token else None, # } def _safe_get(d: dict, key: str, key2: str, default=None): try: return (d.get(key, {}) or {}).get(key2, default) except Exception: return default for address, entry in vault_db.items(): try: detection: "eth_defi.erc_4626.core.ERC4262VaultDetection" = entry["_detection_data"] if filter_bad_entries: if (not entry["Name"]) or (not entry["Denomination"]): # Skip invalid entries as all other required data is missing continue if "unknown" in entry["Name"]: # Skip nameless / broken entries continue if detection.chain < 0: # Skip negative chain IDs (placeholder/sentinel values) continue try: chain_id = ChainId(detection.chain) except ValueError: if filter_bad_entries: # Skip vaults with unknown chain IDs continue raise protocol_slug = entry["Protocol"].lower().replace(" ", "-") vault = Vault( chain_id=chain_id, name=entry.get("Name") or "<unknown>", token_symbol=entry["Symbol"], vault_address=entry["Address"], denomination_token_address=_safe_get(entry,"_denomination_token", "address"), denomination_token_symbol=_safe_get(entry,"_denomination_token", "symbol"), denomination_token_decimals=_safe_get(entry,"_denomination_token", "decimals"), share_token_address=_safe_get(entry,"_share_token", "address") or entry["Address"], share_token_symbol=_safe_get(entry,"_share_token", "symbol") or entry["Symbol"], share_token_decimals=_safe_get(entry,"_share_token", "decimals") or 18, protocol_name=entry["Protocol"], protocol_slug=protocol_slug, performance_fee=entry["Perf fee"], management_fee=entry["Mgmt fee"], deployed_at=detection.first_seen_at, features=filter_broken_enum_values(detection.features), denormalised_data_updated_at=detection.updated_at, tvl=entry["NAV"], issued_shares=entry["Shares"], ) except Exception as e: raise RuntimeError(f"Could not decode entry: {entry}") from e vaults.append(vault) vault_universe = VaultUniverse(vaults) _cached_vault_universe[path] = vault_universe return vault_universe
[docs]def convert_vaults_to_trading_pairs( vaults: Iterable[Vault] ) -> tuple[list[Exchange], pd.DataFrame]: """Create a dataframe that contains vaults as trading pairs to be included alongside real trading pairs. - Generates :py:class:`tradingstrategy.pair.PandasPairUniverse` compatible dataframe for all vaults - Adds :return: Exchange data, pair dataframe tuple """ exchanges = list(Exchange(**v.export_as_exchange()) for v in vaults) rows = [v.export_as_trading_pair() for v in vaults] pairs_df = pd.DataFrame(rows).astype(Vault.get_pandas_schema()) return exchanges, pairs_df
[docs]def load_single_vault( chain_id: ChainId, vault_address: str, path=DEFAULT_VAULT_BUNDLE, ) -> tuple[list[Exchange], pd.DataFrame]: """Load a single bundled vault entry and return as pairs data. Example: .. code-block:: python vault_exchanges, vault_pairs_df = load_single_vault(ChainId.base, "0x45aa96f0b3188d47a1dafdbefce1db6b37f58216") exchange_universe.add(vault_exchanges) pairs_df = pd.concat([pairs_df, vault_pairs_df]) """ vault_universe = load_vault_database(path) vault_universe = vault_universe.limit_to_single(chain_id, vault_address) return convert_vaults_to_trading_pairs(vault_universe.export_all_vaults())
[docs]def load_multiple_vaults( vaults: list[tuple[ChainId, NonChecksummedAddress]] | VaultUniverse, path=DEFAULT_VAULT_BUNDLE, check_all_vaults_found: bool = True, ) -> tuple[list[Exchange], pd.DataFrame]: """Load a single bundled vault entry and return as pairs data. Example: .. code-block:: python vault_exchanges, vault_pairs_df = load_multiple_vaults([ChainId.base, "0x45aa96f0b3188d47a1dafdbefce1db6b37f58216"]) exchange_universe.add(vault_exchanges) pairs_df = pd.concat([pairs_df, vault_pairs_df]) """ if isinstance(vaults, VaultUniverse): vault_universe = vaults else: vault_universe = load_vault_database(path) vault_universe = vault_universe.limit_to_vaults(vaults, check_all_vaults_found=check_all_vaults_found) return convert_vaults_to_trading_pairs(vault_universe.export_all_vaults())
[docs]def create_vault_universe( vaults: list[tuple[ChainId, NonChecksummedAddress]], path=DEFAULT_VAULT_BUNDLE, ) -> VaultUniverse: """Load a single bundled vault entry and return as pairs data. Example: .. code-block:: python vault_exchanges, vault_pairs_df = load_multiple_vaults([ChainId.base, "0x45aa96f0b3188d47a1dafdbefce1db6b37f58216"]) exchange_universe.add(vault_exchanges) pairs_df = pd.concat([pairs_df, vault_pairs_df]) """ vault_universe = load_vault_database(path) vault_universe.limit_to_vaults(vaults) return convert_vaults_to_trading_pairs(vault_universe.export_all_vaults())
[docs]def load_vault_price_data( pairs_df: pd.DataFrame, prices_path: Path=DEFAULT_VAULT_PRICE_BUNDLE, ) -> pd.DataFrame: """Sideload price data for vaults. Uses Arrow-native filtering before pandas conversion to avoid converting the entire Parquet file to pandas when only a small subset of vaults is needed. Schema sample: .. code-block:: plain schema = pa.schema([ ("chain", pa.uint32()), ("address", pa.string()), # Lowercase ("block_number", pa.uint32()), ("timestamp", pa.timestamp("ms")), # s accuracy does not seem to work on rewrite ("share_price", pa.float64()), ("total_assets", pa.float64()), ("total_supply", pa.float64()), ("performance_fee", pa.float32()), ("management_fee", pa.float32()), ("errors", pa.string()), ]) :param pairs_df: Vaults in DataFrame format as exported functions in this module. :param path: Load vault prices file. If not given use the default hardcoded sample bundle. :return: DataFrame with the columns as defined in the schema above. """ import pyarrow as pa import pyarrow.compute as pc import pyarrow.parquet as pq assert isinstance(pairs_df, pd.DataFrame) assert prices_path.exists(), f"Vault price file does not exist: {prices_path}" vaults_to_match = {(row.chain_id, row.address) for idx, row in pairs_df.iterrows()} assert len(vaults_to_match) < 3000, f"The vaults to load number looks too high: {len(vaults_to_match)}" # Read as Arrow table without pandas conversion table = pq.read_table(str(prices_path)) # Filter in Arrow by unique chains and addresses (much faster than # converting everything to pandas first) unique_chains = pa.array([int(c) for c, _ in vaults_to_match], type=pa.uint32()) unique_addresses = pa.array(list({a for _, a in vaults_to_match})) table = table.filter(pc.is_in(table.column("chain"), unique_chains)) table = table.filter(pc.is_in(table.column("address"), unique_addresses)) # Convert only the filtered rows to pandas df = table.to_pandas() # The Arrow filters are an over-approximation (chain IN set AND address IN set), # so apply exact (chain, address) tuple matching on the small result mask = pd.MultiIndex.from_arrays([df["chain"], df["address"]]).isin(vaults_to_match) df = df[mask] return df
[docs]def convert_vault_prices_to_candles( raw_prices_df: pd.DataFrame, frequency: str = "1d", ) -> tuple[pd.DataFrame, pd.DataFrame]: """Convert vault price data to candle format. - Partial support for price candle format to be used in backtesting - For the format see :py:func:`load_vault_price_data` - Only USD stablecoin denominated vaults supported for now Example: .. code-block: python # Load data only for IPOR USDC vault on Base exchanges, pairs_df = load_multiple_vaults([(ChainId.base, "0x45aa96f0b3188d47a1dafdbefce1db6b37f58216")]) vault_prices_df = load_vault_price_data(pairs_df) assert len(vault_prices_df) == 176 # IPOR has 176 days worth of data # Create pair universe based on the vault data exchange_universe = ExchangeUniverse({e.exchange_id: e for e in exchanges}) pair_universe = PandasPairUniverse(pairs_df, exchange_universe=exchange_universe) # Create price candles from vault share price scrape candle_df, liquidity_df = convert_vault_prices_to_candles(vault_prices_df, "1h") candle_universe = GroupedCandleUniverse(candle_df, time_bucket=TimeBucket.h1) assert candle_universe.get_candle_count() == 4201 assert candle_universe.get_pair_count() == 1 liquidity_universe = GroupedLiquidityUniverse(liquidity_df, time_bucket=TimeBucket.h1) assert liquidity_universe.get_sample_count() == 4201 assert liquidity_universe.get_pair_count() == 1 # Get share price as candles for a single vault ipor_usdc = pair_universe.get_pair_by_smart_contract("0x45aa96f0b3188d47a1dafdbefce1db6b37f58216") prices = candle_universe.get_candles_by_pair(ipor_usdc) assert len(prices) == 4201 # Query single price sample timestamp = pd.Timestamp("2025-04-01 04:00") price, when = candle_universe.get_price_with_tolerance( pair=ipor_usdc, when=timestamp, tolerance=pd.Timedelta("2h"), ) assert price == pytest.approx(1.0348826417292332) # Query TVL liquidity, when = liquidity_universe.get_liquidity_with_tolerance( pair_id=ipor_usdc.pair_id, when=timestamp, tolerance=pd.Timedelta("2h"), ) assert liquidity == pytest.approx(1429198.98104) :return: Prices dataframe, TVL dataframe """ assert "chain" in raw_prices_df.columns, f"Got {raw_prices_df.columns}" assert "address" in raw_prices_df.columns, f"Got {raw_prices_df.columns}" assert frequency in ["1d", "1h"], f"Got {frequency}" # # Price candles # df = raw_prices_df df["open"] = df["share_price"] df["low"] = df["share_price"] df["high"] = df["share_price"] df["close"] = df["share_price"] df["volume"] = 0 df["buy_volume"] = 0 df["sell_volume"] = 0 df["pair_id"] = df["address"].apply(_derive_pair_id_from_address) # Even for daily data, we need to resample, because built-in vault price example # data is not midnight aligned df = _resample(df, frequency) prices_df = df # # Liquidity candles # df = raw_prices_df df["open"] = df["total_assets"] df["low"] = df["total_assets"] df["high"] = df["total_assets"] df["close"] = df["total_assets"] df["pair_id"] = df["address"].apply(_derive_pair_id_from_address) # Even for daily data, we need to resample, because built-in vault price example # data is not midnight aligned tvl_df = _resample(df, frequency) return prices_df, tvl_df
def _resample(df: pd.DataFrame, frequency: str) -> pd.DataFrame: """Multipair resample helper.""" df = resample_candles_multiple_pairs(df, frequency) return df def _parse_period_metrics(pm_dict: dict): """Parse a PeriodMetrics object from JSON dict. :param pm_dict: Dictionary from JSON with period metrics fields. :return: PeriodMetrics instance. """ from eth_defi.research.vault_metrics import PeriodMetrics def _parse_timestamp(val): if val is None: return None if isinstance(val, str): # ISO format timestamp try: return pd.Timestamp(val) except Exception: return None return val return PeriodMetrics( period=pm_dict.get("period"), error_reason=pm_dict.get("error_reason"), period_start_at=_parse_timestamp(pm_dict.get("period_start_at")), period_end_at=_parse_timestamp(pm_dict.get("period_end_at")), share_price_start=pm_dict.get("share_price_start"), share_price_end=pm_dict.get("share_price_end"), raw_samples=pm_dict.get("raw_samples", 0), samples_start_at=_parse_timestamp(pm_dict.get("samples_start_at")), samples_end_at=_parse_timestamp(pm_dict.get("samples_end_at")), daily_samples=pm_dict.get("daily_samples", 0), returns_gross=pm_dict.get("returns_gross"), returns_net=pm_dict.get("returns_net"), cagr_gross=pm_dict.get("cagr_gross"), cagr_net=pm_dict.get("cagr_net"), volatility=pm_dict.get("volatility"), sharpe=pm_dict.get("sharpe"), max_drawdown=pm_dict.get("max_drawdown"), tvl_start=pm_dict.get("tvl_start"), tvl_end=pm_dict.get("tvl_end"), tvl_low=pm_dict.get("tvl_low"), tvl_high=pm_dict.get("tvl_high"), ranking_overall=pm_dict.get("ranking_overall"), ranking_chain=pm_dict.get("ranking_chain"), ranking_protocol=pm_dict.get("ranking_protocol"), ) def _parse_vault_metadata(entry: dict) -> VaultMetadata: """Parse VaultMetadata from JSON entry. :param entry: JSON dict from the vault universe JSON blob. :return: VaultMetadata instance with all available fields populated. """ import datetime def _parse_datetime(val): if val is None: return None if isinstance(val, str): try: return datetime.datetime.fromisoformat(val.replace("Z", "+00:00")) except Exception: return None return val # Parse period_results if present period_results = None if entry.get("period_results"): period_results = [_parse_period_metrics(pm) for pm in entry["period_results"]] # Parse features - may be list of strings or list of enums features = entry.get("features", []) return VaultMetadata( vault_name=entry.get("name"), protocol_name=entry.get("protocol"), protocol_slug=entry.get("protocol_slug"), features=features, performance_fee=entry.get("performance_fee"), management_fee=entry.get("management_fee"), lifetime_return=entry.get("lifetime_return"), lifetime_return_net=entry.get("lifetime_return_net"), cagr=entry.get("cagr"), cagr_net=entry.get("cagr_net"), three_months_return=entry.get("three_months_return"), three_months_return_net=entry.get("three_months_return_net"), three_months_cagr=entry.get("three_months_cagr"), three_months_cagr_net=entry.get("three_months_cagr_net"), volatility=entry.get("volatility") or entry.get("three_months_volatility"), sharpe=entry.get("sharpe") or entry.get("three_months_sharpe"), max_drawdown=entry.get("max_drawdown"), tvl=entry.get("current_nav"), tvl_peak=entry.get("peak_nav"), age_years=entry.get("age"), first_updated_at=_parse_datetime(entry.get("lifetime_start")), last_updated_at=_parse_datetime(entry.get("lifetime_end")), deposit_fee=entry.get("deposit_fee"), withdrawal_fee=entry.get("withdraw_fee"), lockup_days=entry.get("lockup"), risk_level=entry.get("risk"), notes=entry.get("notes"), deposit_closed_reason=entry.get("deposit_closed_reason"), redemption_closed_reason=entry.get("redemption_closed_reason"), deposit_next_open=_parse_datetime(entry.get("deposit_next_open")), redemption_next_open=_parse_datetime(entry.get("redemption_next_open")), one_month_return=entry.get("one_month_return"), one_month_return_net=entry.get("one_month_return_net"), one_month_cagr=entry.get("one_month_cagr"), one_month_cagr_net=entry.get("one_month_cagr_net"), vault_slug=entry.get("vault_slug"), address=entry.get("address"), chain=entry.get("chain"), chain_id=entry.get("chain_id"), share_token_address=entry.get("share_token_address"), denomination_token_address=entry.get("denomination_token_address"), denomination=entry.get("denomination"), share_token=entry.get("share_token"), event_count=entry.get("event_count"), last_share_price=entry.get("last_share_price"), stablecoinish=entry.get("stablecoinish"), fee_mode=entry.get("fee_mode"), fee_internalised=entry.get("fee_internalised"), fee_label=entry.get("fee_label"), link=entry.get("link"), trading_strategy_link=entry.get("trading_strategy_link"), flags=entry.get("flags"), period_results=period_results, )
[docs]def load_vault_database_with_metadata( json_data: dict, ) -> VaultUniverse: """Load vault universe with rich metadata from JSON blob. Creates Vault instances with embedded VaultMetadata populated from the pre-computed JSON data generated by eth_defi's `calculate_lifetime_metrics()`. Example: .. code-block:: python import json from tradingstrategy.alternative_data.vault import load_vault_database_with_metadata with open("top_vaults_by_chain.json") as f: json_data = json.load(f) vault_universe = load_vault_database_with_metadata(json_data) for vault in vault_universe.iterate_vaults(): print(vault.name, vault.metadata.cagr) :param json_data: JSON data from top_vaults_by_chain.json containing: - ``generated_at``: timestamp when the data was generated - ``vaults``: list of vault metadata dicts :return: VaultUniverse with Vault instances containing full metadata. """ vaults = [] for vault_entry in json_data.get("vaults", []): try: # Parse VaultMetadata from JSON entry metadata = _parse_vault_metadata(vault_entry) # Get chain_id chain_id_val = vault_entry.get("chain_id") if chain_id_val is None: continue chain_id = ChainId(chain_id_val) # Get required fields vault_address = vault_entry.get("address") if not vault_address: continue name = vault_entry.get("name") if not name: continue # Create Vault with metadata reference vault = Vault( chain_id=chain_id, vault_address=vault_address.lower(), name=name, token_symbol=vault_entry.get("share_token") or name, denomination_token_address=(vault_entry.get("denomination_token_address") or "").lower(), denomination_token_symbol=vault_entry.get("denomination") or "", denomination_token_decimals=vault_entry.get("denomination_decimals", 18), share_token_address=(vault_entry.get("share_token_address") or vault_address).lower(), share_token_symbol=vault_entry.get("share_token") or name, share_token_decimals=vault_entry.get("share_token_decimals", 18), protocol_name=vault_entry.get("protocol") or "", protocol_slug=vault_entry.get("protocol_slug") or "", performance_fee=vault_entry.get("performance_fee"), management_fee=vault_entry.get("management_fee"), tvl=vault_entry.get("current_nav"), features=set(), # Features parsed from metadata metadata=metadata, ) vaults.append(vault) except Exception as e: # Skip entries that fail to parse import logging logging.getLogger(__name__).warning(f"Failed to parse vault entry: {e}") continue return VaultUniverse(vaults)