Source code for tradingstrategy.pair

"""Trading pair information and pair datasets.

The core classes to understand the data are

- :py:class:`DEXPair`: describe one trading pair on different DEXes on different blockchains

- :py:class:`PandasPairUniverse`: all available trading pairs across all blockchains
  and functions to look them up.

- :py:data:`HumanReadableTradingPairDescription`: define the format for symbolic look iup
  for trading pairs

Trading pairs are **not fungible** across DEXes and blockchains.

- The same token might have different address on a different blockchains

- The same trading pair may be on multiple DEXes, or even
  on the same DEX with different fee tiers e.g. Uniswap v3
  gives WETH-USDC at 0.05%, 0.30% and 1% fee tiers,
  most real trading happening on 0.05% tier

Here is an example how to look up a particular trading pair::

.. code-block:: python

    from pyarrow import Table
    from tradingstrategy.chain import ChainId
    from tradingstrategy.exchange import ExchangeUniverse
    from tradingstrategy.pair import PandasPairUniverse
    from tradingstrategy.pair import HumanReadableTradingPairDescription

    # Exchange map data is so small it does not need any decompression
    exchange_universe: ExchangeUniverse = client.fetch_exchange_universe()

    # Decompress the pair dataset to Python map
    # This is raw PyArrow data
    columnar_pair_table: Table = client.fetch_pair_universe()
    print(f"Total pairs {len(columnar_pair_table)}, total exchanges {len(exchange_universe.exchanges)}")

    # Wrap the data in a helper class with indexes for easier access
    pair_universe = PandasPairUniverse(columnar_pair_table.to_pandas(), exchange_universe=exchange_universe)

    # Get BNB-BUSD pair on PancakeSwap v2
    #
    # There are no fee tiers, so we
    #
    desc: HumanReadableTradingPairDescription = (ChainId.bsc, "pancakeswap-v2", "WBNB", "BUSD")
    bnb_busd = pair_universe.get_pair_by_human_description(desc)
    print(f"We have pair {bnb_busd} with 30d volume of USD {bnb_busd.volume_30d}")

See :ref:`tutorial` section for Pairs tutorial for more information.

For exploring the trading pairs through web you can use

- `Trading Strategy trading pair search <https://tradingstrategy.ai/search>`__

- `Trading Strategy trading pair listings (by blockchain, by DEX, etc. by trading fee) <https://tradingstrategy.ai/trading-view>`__

"""

import logging
import enum
import pprint
import warnings
from collections import Counter
from dataclasses import dataclass, field
from types import NoneType
from typing import Optional, Iterable, Dict, TypeAlias

import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
from dataclasses_json import dataclass_json
from numpy import isnan

from tradingstrategy.token import Token
from tradingstrategy.exchange import ExchangeUniverse, ExchangeType, ExchangeNotFoundError
from tradingstrategy.types import NonChecksummedAddress, BlockNumber, UNIXTimestamp, BasisPoint, PrimaryKey, Percent, \
    USDollarAmount, URL
from tradingstrategy.utils.columnar import iterate_columnar_dicts
from tradingstrategy.utils.schema import create_pyarrow_schema_for_dataclass, create_columnar_work_buffer, \
    append_to_columnar_work_buffer
from tradingstrategy.exceptions import DataNotFoundError

# Legacy compatibility
from tradingstrategy.utils.token_filter import *


logger = logging.getLogger(__name__)


#: Data needed to identify a trading pair with human description.
#:
#: This is `(chain, exchange slug, base token, quote token)`.
#:
#: See also
#:
#: - :py:data:`HumanReadableTradingPairDescription`

FeelessPair: TypeAlias = Tuple[ChainId, str | None, str, str]

#: Data needed to identify a trading pair with human description.
#:
#: A version that can sepearate different fee variants.
#: This is all Uniswap v3 pairs, as a single exchange
#: supports the same pair with different pools having different fees.
#:
#: This is `(chain, exchange slug, base token, quote token, pool fee)`.c
#:
#: Pool fee is expressed as floating point. E.g. 0.0005 for 5 BPS fee.
#:
#: See also
#:
#: - :py:data:`HumanReadableTradingPairDescription`
#:
#:
FeePair: TypeAlias = Tuple[ChainId, str | None, str, str, Percent]


#: Shorthand method to identify trading pairs when written down by a human.
#:
#: This is `(chain, exchange slug, base token, quote token)`.
#:
#: Each major trading pair is identifiable as (chain, exchange, base token, quote token tuple).
#: Note that because there can be multipe tokens and fake tokens with the same name,
#: we usually refer to the "best" token which is the highest liquidty/volume trading
#: pair on the particular exchange.
#:
#:
#: Example descriptions
#:
#: .. code-block:: python
#:
#:         (ChainId.arbitrum, "uniswap-v3", "ARB", "USDC", 0.0005),  # Arbitrum, 5 BPS fee
#:         (ChainId.ethereum, "uniswap-v2", "WETH", "USDC"),  # ETH
#:         (ChainId.ethereum, "uniswap-v3", "EUL", "WETH", 0.0100),  # Euler 100 bps fee
#:         (ChainId.bsc, "pancakeswap-v2", "WBNB", "BUSD"),  # BNB
#:         (ChainId.arbitrum, "camelot", "ARB", "WETH"),  # ARB
#:
#: For "best fee match" lookups you can also omit the exchange by setting it to null.
#:
#: .. code-block:: python
#:
#:     # Find any CRV-USDC pair across all DEXes on Polygon, pick one with the best fee tier
#:     (ChainId.polygon, None, "CRV", "USDC"),
#:
#: See also
#:
#: -:py:meth:`PandasPairUniverse.get_pair_by_human_description`
#:
#: - :py:data:`FeelessPair`
#:
#: - :py:data:`FeePair`
#:
HumanReadableTradingPairDescription: TypeAlias = FeePair | FeelessPair



class PairNotFoundError(DataNotFoundError):
    """No trading pair found matching the given criteria."""

    advanced_search_url = "https://tradingstrategy.ai/search?q=&sortBy=liquidity%3Adesc&filters=%7B%22pool_swap_fee%22%3A%5B%5D%2C%22price_change_24h%22%3A%5B%5D%2C%22liquidity%22%3A%5B%5D%2C%22volume_24h%22%3A%5B%5D%2C%22type%22%3A%5B%5D%2C%22blockchain%22%3A%5B%5D%2C%22exchange%22%3A%5B%5D%7D"

    template = f"""This might be a problem in your data loading and filtering. 
                
    Use tradingstrategy.ai website to explore pairs. Once on a pair page, click on the `Copy Python identifier` button to get the correct pair information to use in your strategy.
    
    Here is a list of DEXes: https://tradingstrategy.ai/trading-view/exchanges
    
    Here is advanced search: {advanced_search_url}
    
    For any further questions join our Discord: https://tradingstrategy.ai/community"""

    def __init__(
        self, 
        *, 
        base_token: Optional[str]=None, 
        quote_token: Optional[str]=None, 
        fee_tier: Optional[Percent] = None, 
        pair_id: Optional[int]=None,
        exchange_slug: Optional[str] = None,
        address: Optional[str] = None,
        exchange_id: Optional[int] = None,
        description: Optional[HumanReadableTradingPairDescription] = None,
        custom_message: Optional[str] = None,
    ):

        if base_token:
            assert quote_token, "If base token is specified, quote token must be specified too."
        if quote_token:
            assert base_token, "If quote token is specified, base token must be specified too."

        if base_token and quote_token:
            message = f"No pair with base_token {base_token}, quote_token {quote_token}, fee tier {fee_tier}"
        else:
            assert exchange_slug or pair_id or address, "Either exchange_slug or pair_id or address must be specified if base_token and quote_token are not specified"
            message = custom_message or "No pair with "

        if exchange_slug:
            message = message + f" exchange_slug {exchange_slug}"

        if exchange_id:
            message = message + f" exchange_id {exchange_id}"

        if pair_id:
            message = message + f" pair_id {pair_id}"

        if address:
            message = message + f" address: {address}"

        if description:
            message = message + f" description: {description}"

        super().__init__(message + " found. " + self.template)


class DuplicatePair(Exception):
    """Found multiple trading pairs for the same naive lookup."""


class MultipleTokensWithSymbol(Exception):
    """Found multiple tokens with the same symbol."""


class TokenNotFound(Exception):
    """No token found with a symbol or address."""


class DataDecodeFailed(Exception):
    """The parquet file has damaged data for this trading pair."""




[docs]@dataclass_json @dataclass(slots=True) class DEXPair: """ Trading pair information for a single pair. Presents a single trading pair on decentralised exchanges. DEX trading pairs can be uniquely identified by - Internal id. - (Chain id, address) tuple - the same address can exist on multiple chains. - (Chain slug, exchange slug, pair slug) tuple. - Token names and symbols are *not* unique - anyone can create any number of trading pair tickers and token symbols. Do not rely on token symbols for anything. About data: - There is a different between `token0` and `token1` and `base_token` and `quote_token` conventions - the former are raw DEX (Uniswap) data while the latter are preprocessed by the server to make the data more understandable. Base token is the token you are trading and the quote token is the token you consider "money" for the trading. E.g. in WETH-USDC, USDC is the quote token. In SUSHI-WETH, WETH is the quote token. - Optional fields may be available if the candle server 1) detected the pair popular enough 2) managed to fetch the third party service information related to the token When you download a trading pair dataset from the server, not all trading pairs are available. For more information about trading pair availability see :ref:`trading pair tracking <tracking>`. The class provides some JSON helpers to make it more usable with JSON based APIs. This data class is serializable via `dataclasses-json` methods. Example: .. code-block:: info_as_string = pair.to_json() You can also do `__json__()` convention data export: .. code-block:: info_as_dict = pair.__json__() .. note :: Currently all flags are disabled and will be removed in the future. The historical dataset does not contain any filtering flags, because the data has to be filtered prior to download, to keep the download dump in a reasonasble size. The current data set of 800k trading pairs produce 100 MB dataset of which most of the pairs are useless. The server prefilters trading pairs and thus you cannot access historical data of pairs that have been prefiltered. """ #: Internal primary key for any trading pair pair_id: PrimaryKey #: The chain id on which chain this pair is trading. 1 for Ethereum. chain_id: ChainId #: The exchange where this token trades exchange_id: PrimaryKey #: Smart contract address for the pair. #: In the case of Uniswap this is the pair (pool) address. address: NonChecksummedAddress #: Token pair contract address on-chain. #: Lowercase, non-checksummed. token0_address: str #: Token pair contract address on-chain #: Lowercase, non-checksummed. token1_address: str #: Token0 as in raw Uniswap data. #: ERC-20 contracst are not guaranteed to have this data. token0_symbol: Optional[str] #: Token1 as in raw Uniswap data #: ERC-20 contracst are not guaranteed to have this data. token1_symbol: Optional[str] #: What kind of exchange this pair is on dex_type: Optional[ExchangeType] = None #: Naturalised base and quote token. #: Uniswap may present the pair in USDC-WETH or WETH-USDC order based on the token address order. #: However we humans always want the quote token to be USD, or ETH or BTC. #: For the reverse token orders, the candle serve swaps the token order #: so that the quote token is the more natural token of the pair (in the above case USD) base_token_symbol: Optional[TokenSymbol] = None #: Naturalised base and quote token. #: Uniswap may present the pair in USDC-WETH or WETH-USDC order based on the token address order. #: However we humans always want the quote token to be USD, or ETH or BTC. #: For the reverse token orders, the candle serve swaps the token order #: so that the quote token is the more natural token of the pair (in the above case USD) quote_token_symbol: Optional[TokenSymbol] = None #: Number of decimals to convert between human amount and Ethereum fixed int raw amount. #: Note - this information might be missing from ERC-20 smart contracts. #: If the information is missing the token is not tradeable in practice. token0_decimals: Optional[int] = None #: Number of decimals to convert between human amount and Ethereum fixed int raw amount #: Note - this information might be missing from ERC-20 smart contracts. #: If the information is missing the token is not tradeable in practice. token1_decimals: Optional[int] = None #: Denormalised web page and API look up information exchange_slug: Optional[str] = None #: Exchange factory address. #: Denormalised here, so we do not need an additional lookup. exchange_address: Optional[str] = None #: Denormalised web page and API look up information pair_slug: Optional[str] = None #: Block number of the first Uniswap Swap event first_swap_at_block_number: Optional[BlockNumber] = None #: Block number of the last Uniswap Swap event last_swap_at_block_number: Optional[BlockNumber] = None #: Timestamp of the first Uniswap Swap event first_swap_at: Optional[UNIXTimestamp] = None #: Timestamp of the first Uniswap Swap event last_swap_at: Optional[UNIXTimestamp] = None #: Pair has been flagged inactive, because it has not traded at least once during the last 30 days. #: TODO - inactive, remove. flag_inactive: Optional[bool] = None #: Pair is blacklisted by operators. #: Current there is no blacklist process so this is always false. #: TODO - inactive, remove. flag_blacklisted_manually: Optional[bool] = None #: Quote token is one of USD, ETH, BTC, MATIC or similar popular token variants. #: Because all candle data is outputted in the USD, if we have a quote token #: for which we do not have an USD conversation rate reference price source, #: we cannot create candles for the pair. #: TODO - inactive, remove. flag_unsupported_quote_token: Optional[bool] = None #: Pair is listed on an exchange we do not if it is good or not #: TODO - inactive, remove. flag_unknown_exchange: Optional[bool] = None #: Swap fee in basis points if known fee: Optional[BasisPoint] = None #: Risk assessment summary data buy_count_all_time: Optional[int] = None #: Risk assessment summary data sell_count_all_time: Optional[int] = None #: Risk assessment summary data buy_volume_all_time: Optional[float] = None #: Risk assessment summary data sell_volume_all_time: Optional[float] = None #: Risk assessment summary data buy_count_30d: Optional[int] = None #: Risk assessment summary data sell_count_30d: Optional[int] = None #: Risk assessment summary data buy_volume_30d: Optional[float] = None #: Risk assessment summary data sell_volume_30d: Optional[float] = None #: Buy token tax for this trading pair. #: #: Values of 900+ are error codes of internal token tax measurer and should be ignored. #: #: See #: - :ref:`token-tax` what it means #: - :py:func:`tradingstrategy.utils.token_extra_data.load_extra_metadata` how to load buy_tax: Optional[Percent] = None #: Transfer token tax for this trading pair. #: See :ref:`token-tax` for details. #: #: **Legacy**. Not used. #: transfer_tax: Optional[Percent] = None #: Sell tax for this trading pair. #: #: Values of 900+ are error codes of internal token tax measurer and should be ignored. #: #: See #: - :ref:`token-tax` what it means #: - :py:func:`tradingstrategy.utils.token_extra_data.load_extra_metadata` how to load sell_tax: Optional[Percent] = None #: Exchange name. #: #: Not part of the datasets. Added during the instance construction. #: exchange_name: Optional[str] = None #: Any extra/user supplied data. #: #: Can contain: #: - `top_pair_data`; TopPairData instance #: #: See #: - :py:meth:`token_sniffer_data` #: - :py:func:`tradingstrategy.utils.token_extra_data.load_extra_metadata` how to load #: other_data: Optional[dict] = field(default_factory=dict) def __repr__(self): exchange_name = self.exchange_name if self.exchange_name else f"{self.exchange_id}" if self.chain_id not in (ChainId.unknown, ChainId.centralised_exchange): chain_name = self.chain_id.get_slug() return f"<Pair #{self.pair_id} {self.base_token_symbol} - {self.quote_token_symbol} ({self.address}) at exchange {exchange_name} on {chain_name}>" else: # Centralised exchange side loaded data return f"<Pair #{self.pair_id} {self.base_token_symbol} - {self.quote_token_symbol} at exchange {exchange_name}>" def __eq__(self, other: "DEXPair"): """Trade positions are unique by opening timestamp and pair id.] We assume there cannot be a position opened for the same asset at the same time twice. """ assert self.pair_id is not None return self.pair_id == other.pair_id def __hash__(self): """set() and dict() compatibility""" return hash(self.pair_id) @property def fee_tier(self) -> Optional[Percent]: """Return the trading pair fee as 0...1. This is a synthetic properly based on :py:attr:`fee` data column. :return: None if the fee information is not availble. (Should not happen on real data, but may happen in unit tests.) """ if self.fee is None: return None return self.fee / 10_000 @property def volume_30d(self) -> USDollarAmount: """Denormalise trading volume last 30 days. - Not an accurate figure, as this is based on rough 30 days batch job - Good enough for undertanding a trading pair is tradeable """ vol = 0 buy_vol = self.buy_volume_30d or 0 sell_vol = self.sell_volume_30d or 0 if not isnan(buy_vol): vol += buy_vol if not isnan(sell_vol): vol += sell_vol return vol @property def base_token_address(self) -> str: """Get smart contract address for the base token. :return: Lowercase, non-checksummed. """ if self.token0_symbol == self.base_token_symbol: return self.token0_address else: return self.token1_address @property def quote_token_address(self) -> str: """Get smart contract address for the quote token :return: Token address in checksummed case """ if self.token0_symbol == self.quote_token_symbol: return self.token0_address else: return self.token1_address @property def quote_token_decimals(self) -> Optional[str]: """Get token decimal count for the quote token""" if self.token0_symbol == self.quote_token_symbol: return self.token0_decimals else: return self.token1_decimals @property def base_token_decimals(self) -> Optional[int]: """Get token decimal count for the base token. """ if self.token0_symbol == self.base_token_symbol: return self.token0_decimals else: return self.token1_decimals @property def quote_token_decimals(self) -> Optional[int]: """Get token decimal count for the quote token""" if self.token0_symbol == self.quote_token_symbol: return self.token0_decimals else: return self.token1_decimals @property def token_sniffer_data(self) -> dict | None: """Get TokenSniffer metadata. Must be separarely loaded. See :py:func:`tradingstrategy.utils.tax.load_tokensniffer_metadata` """ top_pair_data = self.other_data.get("top_pair_data") if top_pair_data: return top_pair_data.token_sniffer_data return None
[docs] def is_tradeable( self, liquidity_threshold=None, volume_threshold_30d=100_000. ) -> bool: """Can this pair be traded. .. note :: Liquidity threshold is TBD. :param liquidity_threshold: How much the trading pair pool needs to have liquidity to be tradeable. :param volume_threshold_30d: How much montly volume the pair needs to have to be tradeable. Only used if liquidity data is missing. """ # Volume can be Nan as well return (self.volume_30d or 0) >= volume_threshold_30d
[docs] def get_ticker(self) -> str: """Return trading 'ticker'""" return f"{self.base_token_symbol}-{self.quote_token_symbol}"
[docs] def get_friendly_name(self, exchange_universe: ExchangeUniverse) -> str: """Get a very human readable name for this trading pair. We need to translate the exchange id to someething human readable, and for this we need to have the access to the exchange universe. """ exchange = exchange_universe.get_by_id(self.exchange_id) if exchange: exchange_name = exchange.name else: exchange_name = f"Exchange #{self.exchange_id}" return f"{self.base_token_symbol} - {self.quote_token_symbol}, pair #{self.pair_id} on {exchange_name}"
[docs] def get_trading_pair_page_url(self) -> Optional[str]: """Get information page for this trading pair. :return: URL of the trading pair page or None if page/data not available. """ chain_slug = self.chain_id.get_slug() if not self.exchange_slug: return None if not self.pair_slug: return None return f"https://tradingstrategy.ai/trading-view/{chain_slug}/{self.exchange_slug}/{self.pair_slug}"
def __json__(self, request) -> dict: """Return dictionary presentation when this DEXPair is serialised as JSON. Provided for pyramid JSON renderer compatibility. This method is provided for API endpoints returned `More information <https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/renderers.html#using-a-custom-json-method>`_. """ return self.__dict__
[docs] @classmethod def to_pyarrow_schema(cls) -> pa.Schema: """Construct schema for reading writing :term:`Parquet` filss for pair information.""" # Enums must be explicitly expressed hints = { "chain_id": pa.uint64(), "dex_type": pa.string(), "other_data": NoneType, # TODO: Broken. PyArrow cannot handle object-like data. } return create_pyarrow_schema_for_dataclass(cls, hints=hints)
[docs] @classmethod def from_series(cls, pair_id: int, series: pd.Series) -> "DEXPair": """Create a DEXPair instance from Pandas Series. - Series in this case is one row in pairs.parquet data :param pair_id: Pair id. Not part of the row because it iss index. :param series: Pair data as a Pandas row. """ assert isinstance(series, pd.Series) data = series.replace(np.nan, None).to_dict() data["pair_id"] = pair_id return DEXPair.from_dict(data)
[docs] @classmethod def convert_to_pyarrow_table(cls, pairs: List["DEXPair"], check_schema=False) -> pa.Table: """Convert a list of DEXPair instances to a Pyarrow table. Used to prepare a data export on a server. :param pairs: The list wil be consumed in the process :param check_schema: Run additional checks on the data. Slow. Use only in tests. May be give happier error messages instead of "OverflowError" what pyarrow spits out. """ buffer = create_columnar_work_buffer(cls) for p in pairs: assert isinstance(p, DEXPair), f"Got {p}" append_to_columnar_work_buffer(buffer, p) schema = cls.to_pyarrow_schema() if check_schema: field: pa.Field for field in schema: try: pa.array(buffer[field.name], field.type) except Exception as e: # Usually cannot fit data into a column, like negative or none values raise RuntimeError(f"Cannot process field {field}") from e return pa.Table.from_pydict(buffer, schema)
[docs] @classmethod def convert_to_dataframe(cls, pairs: List["DEXPair"]) -> pd.DataFrame: """Convert Python DEXPair objects back to the Pandas dataframe presentation. As this is super-inefficient, do not use for large amount of data. """ # https://stackoverflow.com/questions/20638006/convert-list-of-dictionaries-to-a-pandas-dataframe dicts = [p.to_dict() for p in pairs] return pd.DataFrame.from_dict(dicts)
[docs] @classmethod def create_from_row(cls, row: pd.Series) -> "DEXPair": """Convert a DataFrame for to a DEXPair instance. Allow using of helper methods on the pair data. It is recommend you avoid this if you do not need row-like data. """ items = {k: v for k,v in row.items()} return DEXPair.from_dict(items)
[docs] def to_human_description(self) -> HumanReadableTradingPairDescription: """Get human description for this pair.""" return (self.chain_id, self.exchange_slug, self.base_token_symbol, self.quote_token_symbol, self.fee)
[docs] def get_base_token(self) -> Token: """Return token class presentation of base token in this trading pair.""" return Token( chain_id=self.chain_id, symbol=self.base_token_symbol, address=self.base_token_address, decimals=self.base_token_decimals, )
[docs] def get_quote_token(self) -> Token: """Return token class presentation of quote token in this trading pair.""" return Token( chain_id=self.chain_id, symbol=self.quote_token_symbol, address=self.quote_token_address, decimals=self.quote_token_decimals, )
[docs]class PandasPairUniverse: """A pair universe implementation that is created from Pandas dataset. This is a helper class, as :py:class:`pandas.DataFrame` is somewhat more difficult to interact with. This class will read the raw data frame and convert it to `DEXPair` objects with a lookup index. Because the DEXPair conversion is expensive for 10,000s of Python objects, it is recommended that you filter the raw :py:class:`pandas.DataFrame` by using filtering functions in :py:mod:`tradingstrategy.pair` first, before initializing :py:class:`PandasPairUniverse`. About the usage: - Single trading pairs can be looked up using :py:meth:`PandasPairUniverse.get_pair_by_smart_contract` and :py:meth:`PandasPairUniverse.get_pair_by_id` - Multiple pairs can be looked up by directly reading `PandasPairUniverse.df` Pandas dataframe Example how to use: .. code-block:: # Get dataset from the server as Apache Pyarrow table columnar_pair_table = client.fetch_pair_universe() # Convert Pyarrow -> Pandas -> in-memory DEXPair index pair_universe = PandasPairUniverse(columnar_pair_table.to_pandas()) # Lookup SUSHI-WETH trading pair from DEXPair index # https://tradingstrategy.ai/trading-view/ethereum/sushi/sushi-eth pair: DEXPair = pair_universe.get_pair_by_smart_contract("0x795065dcc9f64b5614c407a6efdc400da6221fb0") If the pair index is too slow to build, or you want to keep it lean, you can disable the indexing with `build_index`. In this case, some of the methods won't work: .. code-block:: # Get dataset from the server as Apache Pyarrow table columnar_pair_table = client.fetch_pair_universe() # Convert Pyarrow -> Pandas -> in-memory DEXPair index pair_universe = PandasPairUniverse(columnar_pair_table.to_pandas(), build_index=False) """
[docs] def __init__( self, df: pd.DataFrame, build_index=True, exchange_universe: Optional[ExchangeUniverse]=None ): """ :param df: The source DataFrame that contains all DEXPair entries :param build_index: Build quick lookup index for pairs :param exchange_universe: Optional exchange universe needed for human-readable pair lookup. We cannot properly resolve pairs unless we can map exchange names to their ids. Currently optional, only needed by `get_pair()`. """ assert isinstance(df, pd.DataFrame), f"Expected DataFrame, gor {df.__class__}" self.df = df.set_index(df["pair_id"]) #: pair_id -> raw dict data mappings #: #: Constructed in one pass from Pandas DataFrame. #: #: Don't access directly, use :py:meth:`iterate_pairs`. self.pair_map: Dict[int, dict] = {} #: pair_id -> constructed DEXPair cache #: #: Don't access directly, use :py:meth:`iterate_pairs`. self.dex_pair_obj_cache: Dict[int, DEXPair] = {} # pair smart contract address -> DEXPair self.smart_contract_map = {} # Internal cache for get_token() lookup # address -> info tuple mapping self.token_cache: Dict[str, Token] = {} if build_index: self.build_index() self.exchange_universe = exchange_universe #: A hack used in a single trading pair univerwse self.single_pair_cache: DEXPair = None
[docs] def iterate_pairs(self) -> Iterable[DEXPair]: """Iterate over all pairs in this universe.""" for pair_id in self.pair_map.keys(): yield self.get_pair_by_id(pair_id)
[docs] def iterate_tokens(self) -> Iterable[Token]: """Iterate over all tokens in this universe.""" already_iterated = set() for pair in self.iterate_pairs(): token_a = pair.get_base_token() if token_a not in already_iterated: yield token_a already_iterated.add(token_a) token_b = pair.get_quote_token() if token_b not in already_iterated: yield token_b already_iterated.add(token_b)
[docs] def limit_to_pairs(self, pair_ids: Collection[PrimaryKey]) -> "PandasPairUniverse": """Create a reduced pair universe with :param pair_ids: Only leave these pairs. :return: New pair universe only with selected pairs """ limited_df = self.df.loc[list(pair_ids)] return PandasPairUniverse(limited_df, exchange_universe=self.exchange_universe)
[docs] def build_index(self): """Create pair_id -> data mapping. Allows fast lookup of individual pairs. .. warning:: This function assumes the universe contains data for only one blockchain. The same address can exist across multiple EVM chains. The created smart contract address index does not index chain id and thus is invalid. """ # TODO: Slow - fix # https://stackoverflow.com/a/73638890/315168 self.pair_map = self.df.T.to_dict() self.smart_contract_map = {d["address"].lower(): d for d in self.pair_map.values()}
[docs] def get_all_pair_ids(self) -> Collection[PrimaryKey]: """Get all pair ids in the data frame.""" return self.df["pair_id"].unique()
[docs] def get_pair_ids_by_exchange(self, exchange_id: PrimaryKey) -> pd.DataFrame: """Get all pair ids on a specific exchange. :return: Raw slide of DataFrame """ return self.df.loc[self.df["exchange_id"] == exchange_id]["pair_id"]
[docs] def get_count(self) -> int: """How many trading pairs there are.""" return len(self.df)
[docs] def get_pair_by_id(self, pair_id: PrimaryKey) -> DEXPair: """Look up pair information and return its data. Uses a cached path. Constructing :py:class:`DEXPair` objects is a bit slow, so this is a preferred method if you need to access multiple pairs in a hot loop. :raise PairNotFoundError: If pair for the pair id is not loaded in our datasets. :return: Nicely presented :py:class:`DEXPair`. """ # First try the cached paths if self.pair_map: # First try object cache obj = self.dex_pair_obj_cache.get(pair_id) if not obj: # Convert any pairs in-fly to DEXPair objects and store them. # We do not initially construct these objects, # as we do not know what pairs a strategy might access. data = self.pair_map.get(pair_id) assert data is not None, f"pair_map lacks entry for: {pair_id}, we have {len(self.pair_map)} entries" obj = _convert_to_dex_pair(data, self.exchange_universe) self.dex_pair_obj_cache[pair_id] = obj return obj # We did not build this universe with pair index # Not sure why anyone would really want to do this # maybe eliminate this code path altogether in the future df = self.df pairs: pd.DataFrame = df.loc[df["pair_id"] == pair_id] if len(pairs) > 1: raise DuplicatePair(f"Multiple pairs found for id {pair_id}") if len(pairs) == 1: data = next(iter(pairs.to_dict("index").values())) obj = _convert_to_dex_pair(data, self.exchange_universe) return obj raise PairNotFoundError(pair_id=pair_id)
[docs] def get_pair_by_smart_contract(self, address: str) -> Optional[DEXPair]: """Resolve a trading pair by its pool smart contract address. .. warning:: This function assumes the universe contains data for only one blockchain. The same address can exist across multiple EVM chains. :param address: Ethereum smart contract address of the Uniswap pair contract """ address = address.lower() assert self.smart_contract_map, "You need to build the index to use this function" data = self.smart_contract_map.get(address) if data is None: raise PairNotFoundError( address=address, custom_message=f"smart_contract_map does not have entry for {address}, it has {len(self.smart_contract_map)} entries", ) return self.get_pair_by_id(data["pair_id"])
[docs] def get_token(self, address: str, chain_id=None) -> Optional[Token]: """Get a token that is part of any trade pair. Get a token details for a token that is base or quotetoken of any trading pair. ..note :: TODO: Not a final implementation subject to chage. :param address: ERC-20 address of base or quote token in a trading pair. :param chain_id: Get token only on a specific chain. Same token address can be across multiple chains. :return: Tuple (name, symbol, address, decimals) or None if not found. """ address = address.lower() token: Optional[Token] = None assert len(self.pair_map) > 0, "This method can be only used with in-memory pair index" if address not in self.token_cache: for pair_id in self.pair_map.keys(): p = self.get_pair_by_id(pair_id) if chain_id: if p.chain_id != chain_id: continue if p.token0_address == address: token = Token(p.chain_id, p.token0_symbol, p.token0_address, p.token0_decimals) elif p.token1_address == address: token = Token(p.chain_id, p.token1_symbol, p.token1_address, p.token1_decimals) self.token_cache[address] = token return self.token_cache[address]
[docs] def get_token_by_symbol( self, symbol: TokenSymbol, chain_id=None, pick_by_highest_volume=True, ) -> Optional[Token]: """Get one and only one token from the trading universe by its symbol. There might be multiple scam tokens for any legit token symbol. By default we use ``pick_by_highest_volume`` to choose one. If this method does not use, use :py:meth:`get_token` with address lookup instead. This method is not designed for heavy access, as it is using uncached data. Use sparsingly. :param symbol: E.g. ``USDC``. :param chain_id: Match token on a specific chain only. :param pick_by_highest_volume: If multiple mactches are found, pick one witht the highest 30d vol. Volume is 30d total across all loaded pairs. :return: Tuple (name, symbol, address, decimals) or None if not found. :raise MultipleTokensWithSymbol: In the case we get multiple matches. :raise TokenNotFound: If none of the loaded trading pairs contains a matching token with a symbol. """ assert len(self.pair_map) > 0, "This method can be only used with in-memory pair index" symbol = symbol.lower() # Token -> total vol mappings matches: Counter[Token, USDollarAmount] = Counter() for pair_id in self.pair_map.keys(): p = self.get_pair_by_id(pair_id) if chain_id: if p.chain_id != chain_id: continue matched_token = None if p.token0_symbol.lower() == symbol: matched_token = self.get_token(p.token0_address, chain_id) elif p.token1_symbol.lower() == symbol: matched_token = self.get_token(p.token1_address, chain_id) if matched_token: matches[matched_token] += p.volume_30d if len(matches) > 1: if pick_by_highest_volume: token = matches.most_common(1)[0][0] else: raise MultipleTokensWithSymbol(f"Matched multiple tokens for symbol {symbol}: {matches}") elif len(matches) == 1: token = next(iter(matches.keys())) else: raise TokenNotFound(f"Trading pair data does not contain any token with symbol {symbol}") return token
[docs] def get_all_tokens(self) -> Set[Token]: """Get all base and quote tokens in trading pairs. .. warning :: This method is useful for only test/limited pair count universes. It is very slow and mainly purported for debugging and diagnostics. """ tokens = set() for pair_id in self.pair_map.keys(): p = self.get_pair_by_id(pair_id) tokens.add(self.get_token(p.base_token_address)) tokens.add(self.get_token(p.quote_token_address)) return tokens
[docs] def get_single(self) -> DEXPair: """For strategies that trade only a single trading pair, get the only pair in the universe. :raise AssertionError: If our pair universe does not have an exact single pair. If the target pair could not be decoded. """ if self.single_pair_cache: return self.single_pair_cache pair_count = len(self.pair_map) assert pair_count == 1, f"Not a single trading pair universe, we have {pair_count} pairs" data = next(iter(self.pair_map.values())) # See https://github.com/tradingstrategy-ai/trading-strategy/issues/104 obj =_convert_to_dex_pair(data, self.exchange_universe) self.single_pair_cache = obj return self.single_pair_cache
[docs] def get_single_quote_token(self) -> Token: """Gets the only trading pair quote token for this trading universe. :return: Quote token for all trading pairs. :raise AssertionError: If we have trading pairs with different quotes. E.g. both ``-ETH`` and ``-USDC`` pairs. """ # Create (chain id, quote token address) set quotes = set() for p in self.iterate_pairs(): quotes.add((p.chain_id, p.quote_token_address)) tokens = [self.get_token(q[1]) for q in quotes] assert len(tokens) == 1, f"We have multiple quote tokens: {tokens}" return tokens[0]
[docs] def get_by_symbols(self, base_token_symbol: str, quote_token_symbol: str) -> Optional[DEXPair]: """For strategies that trade only a few trading pairs, get the only pair in the universe. .. warning :: Currently, this method is only safe for prefiltered universe. There are no safety checks if the returned trading pair is legit. In the case of multiple matching pairs, a random pair is returned.g :raise PairNotFoundError: If we do not have a pair with the given symbols """ for pair_id in self.pair_map.keys(): pair = self.get_pair_by_id(pair_id) if pair.base_token_symbol == base_token_symbol and pair.quote_token_symbol == quote_token_symbol: return pair raise PairNotFoundError(base_token=base_token_symbol, quote_token=quote_token_symbol)
[docs] def get_by_symbols_safe(self, base_token_symbol: str, quote_token_symbol: str) -> Optional[DEXPair]: """Get a trading pair by its ticker symbols. In the case of multiple matching pairs, an exception is raised. :raise DuplicatePair: If multiple pairs are found for the given symbols :raise PairNotFoundError: If we do not have a pair with the given symbols :return DEXPair: The trading pair """ pair_placeholder = [] for pair_id in self.pair_map.keys(): pair = self.get_pair_by_id(pair_id) if pair.base_token_symbol == base_token_symbol and pair.quote_token_symbol == quote_token_symbol: pair_placeholder.append(pair) if len(pair_placeholder) > 1: raise DuplicatePair(f"Multiple pairs found for id {pair_id}") if len(pair_placeholder) == 1: return pair_placeholder[0] raise PairNotFoundError(base_token=base_token_symbol, quote_token=quote_token_symbol)
[docs] def get_one_pair_from_pandas_universe( self, exchange_id: PrimaryKey | None, base_token: str, quote_token: str, fee_tier: Optional[Percent] = None, pick_by_highest_vol=False) -> Optional[DEXPair]: """Get a trading pair by its ticker symbols. Note that this method works only very simple universes, as any given pair is poised to have multiple tokens and multiple trading pairs on different exchanges. Example: .. code-block:: python # Get PancakeSwap exchange, # for the full exchange list see https://tradingstrategy.ai/trading-view/exchanges pancake = exchange_universe.get_by_chain_and_slug(ChainId.bsc, "pancakeswap-v2") # Because there can be multiple trading pairs with same tickers, # we pick the genuine among the scams based on its trading volume wbnb_busd_pair = pair_universe.get_one_pair_from_pandas_universe( pancake.exchange_id, "WBNB", "BUSD", pick_by_highest_vol=True, ) print("WBNB address is", wbnb_busd_pair.base_token_address) print("BUSD address is", wbnb_busd_pair.quote_token_address) print("WBNB-BUSD pair contract address is", wbnb_busd_pair.address) :param exchange_id: The exchange internal id which we are looking up. Set ``None`` to look all exchanges. :param fee_tier: Uniswap v3 and likes provide the same ticker in multiple fee tiers. You need to use `fee_tier` parameter to separate the Uniswap pools. Fee tier is not needed for Uniswap v2 like exchanges as all of their trading pairs have the same fee structure. The fee tier is 0...1 e.g. 0.0030 for 3 BPS or 0.3% fee tier. If fee tier is not provided, then the lowest fee tier pair is returned. However the lowest fee tier might not have the best liquidity or volume. :param pick_by_highest_vol: If multiple trading pairs with the same symbols are found, pick one with the highest volume. This is because often malicious trading pairs are create to attract novice users. :raise DuplicatePair: If the universe contains more than single entry for the pair. :raise PairNotFoundError: If the pair is not found in the universe. :return: DEXPairs with the given symbols """ if fee_tier is not None: assert (fee_tier >= 0) and (fee_tier <= 1), f"Received bad fee tier: {base_token}-{quote_token}: {fee_tier}" df = self.df conditions = (df["base_token_symbol"] == base_token) & (df["quote_token_symbol"] == quote_token) if exchange_id is not None: conditions = conditions & (df["exchange_id"] == exchange_id) if fee_tier is not None: fee_bps = int(fee_tier * 10000) # Convert to BPS conditions = conditions & (df["fee"] == fee_bps) pairs: pd.DataFrame = df.loc[conditions] if len(pairs) > 1: if not pick_by_highest_vol: for p in pairs.to_dict(orient="records"): logger.error("Conflicting pair: %s", p) raise DuplicatePair(f"Found {len(pairs)} trading pairs for {base_token}-{quote_token} when 1 was expected") # Sort by trade volume and pick the highest one pairs = pairs.sort_values(by=["fee", "buy_volume_all_time"], ascending=[True, False]) try: data = next(iter(pairs.to_dict("index").values())) except ValueError as e: raise ValueError(f"Could not pull out the first value: {pairs}\nDict: {pairs.to_dict()}") from e return _convert_to_dex_pair(data, self.exchange_universe) if len(pairs) == 1: data = next(iter(pairs.to_dict("index").values())) return _convert_to_dex_pair(data, self.exchange_universe) raise PairNotFoundError(base_token=base_token, quote_token=quote_token, fee_tier=fee_tier, exchange_id=exchange_id)
[docs] def get_pair(self, chain_id: ChainId, exchange_slug: str, base_token: str, quote_token: str, fee_tier: Optional[float] = None, exchange_universe: Optional[ExchangeUniverse]=None ) -> DEXPair: """Get a pair by its description. The simplest way to access pairs in the pair universe. To use this method, we must include `exchange_universe` in the :py:meth:`__init__` as otherwise we do not have required look up tables. :return: The trading pair on the exchange. Highest volume trading pair if multiple matches. :raise PairNotFoundError: In the case input data cannot be resolved. """ assert isinstance(chain_id, ChainId) assert type(exchange_slug) == str assert type(base_token) == str assert type(quote_token) == str assert self.exchange_universe is not None or exchange_universe is not None, "You need to provide exchange_universe argument to use this method" eu = exchange_universe or self.exchange_universe if fee_tier: desc = (chain_id, exchange_slug, base_token, quote_token, fee_tier) else: desc = (chain_id, exchange_slug, base_token, quote_token,) return self.get_pair_by_human_description(eu, desc)
[docs] def get_pair_by_human_description(self, desc: HumanReadableTradingPairDescription | ExchangeUniverse, exchange_universe: ExchangeUniverse | HumanReadableTradingPairDescription = None, ) -> DEXPair: """Get pair by its human readable description. Look up a trading pair by chain, exchange, base, quote token tuple. See :py:data:`HumanReadableTradingPairDescription` for more information. .. note :: API signature change and the order of parameters reversed in TS version 0.19 Example: .. code-block:: python # Get BNB-BUSD pair on PancakeSwap v2 desc = (ChainId.bsc, "pancakeswap-v2", "WBNB", "BUSD") bnb_busd = pair_universe.get_pair_by_human_description(desc) assert bnb_busd.base_token_symbol == "WBNB" assert bnb_busd.quote_token_symbol == "BUSD" assert bnb_busd.buy_volume_30d > 1_000_000 Another example: .. code-block:: python pair_human_descriptions = ( (ChainId.ethereum, "uniswap-v2", "WETH", "USDC"), # ETH (ChainId.ethereum, "uniswap-v2", "EUL", "WETH", 0.0030), # Euler 30 bps fee (ChainId.ethereum, "uniswap-v3", "EUL", "WETH", 0.0100), # Euler 100 bps fee (ChainId.ethereum, "uniswap-v2", "MKR", "WETH"), # MakerDAO (ChainId.ethereum, "uniswap-v2", "HEX", "WETH"), # MakerDAO (ChainId.ethereum, "uniswap-v2", "FNK", "USDT"), # Finiko (ChainId.ethereum, "sushi", "AAVE", "WETH"), # AAVE (ChainId.ethereum, "sushi", "COMP", "WETH"), # Compound (ChainId.ethereum, "sushi", "WETH", "WBTC"), # BTC (ChainId.ethereum, "sushi", "ILV", "WETH"), # Illivium (ChainId.ethereum, "sushi", "DELTA", "WETH"), # Delta (ChainId.ethereum, "sushi", "UWU", "WETH"), # UwU lend (ChainId.ethereum, "uniswap-v2", "UNI", "WETH"), # UNI (ChainId.ethereum, "uniswap-v2", "CRV", "WETH"), # Curve (ChainId.ethereum, "sushi", "SUSHI", "WETH"), # Sushi (ChainId.bsc, "pancakeswap-v2", "WBNB", "BUSD"), # BNB (ChainId.bsc, "pancakeswap-v2", "Cake", "BUSD"), # Cake (ChainId.bsc, "pancakeswap-v2", "MBOX", "BUSD"), # Mobox (ChainId.bsc, "pancakeswap-v2", "RDNT", "WBNB"), # Radiant (ChainId.polygon, "quickswap", "WMATIC", "USDC"), # Matic (ChainId.polygon, "quickswap", "QI", "WMATIC"), # QiDao (ChainId.polygon, "sushi", "STG", "USDC"), # Stargate (ChainId.avalanche, "trader-joe", "WAVAX", "USDC"), # Avax (ChainId.avalanche, "trader-joe", "JOE", "WAVAX"), # TraderJoe (ChainId.avalanche, "trader-joe", "GMX", "WAVAX"), # GMX (ChainId.arbitrum, "camelot", "ARB", "WETH"), # ARB # (ChainId.arbitrum, "sushi", "MAGIC", "WETH"), # Magic ) client = persistent_test_client exchange_universe = client.fetch_exchange_universe() pairs_df = client.fetch_pair_universe().to_pandas() pair_universe = PandasPairUniverse(pairs_df, exchange_universe=exchange_universe) pairs: List[DEXPair] pairs = [pair_universe.get_pair_by_human_description(exchange_universe, d) for d in pair_human_descriptions] assert len(pairs) == 26 assert pairs[0].exchange_slug == "uniswap-v2" assert pairs[0].get_ticker() == "WETH-USDC" assert pairs[1].exchange_slug == "uniswap-v2" assert pairs[1].get_ticker() == "EUL-WETH" :param desc: Trading pair description as tuple (blockchain, dex, base, quote fee) :param exchange_universe: The current database used to decode exchanges. If not given use the `exchange_universe` given in the constructor. Either argument here or argument in the constructor must be given. :return: The trading pair on the exchange. Highest volume trading pair if multiple matches. :raise PairNotFoundError: In the case input data cannot be resolved. """ # Check legacy parameter order if isinstance(desc, ExchangeUniverse): desc, exchange_universe = exchange_universe, desc if exchange_universe is None: exchange_universe = self.exchange_universe assert exchange_universe is not None, "get_pair_by_human_description() needs exchange_universe passed as constructor or function argument in order to do pair lookups" if len(desc) >= 5: chain_id, exchange_slug, base_token, quote_token, fee_tier = desc else: chain_id, exchange_slug, base_token, quote_token = desc fee_tier = None common_explanation = "Use pair description format (chain, exchange, base, quote) or (chain, exchange, base, quote, fee)" assert isinstance(chain_id, ChainId), f"Not ChainId: {chain_id}\n{common_explanation}" assert type(exchange_slug) in (str, NoneType), f"Not exchange slug: {exchange_slug}\n{common_explanation}" assert type(base_token) == str, f"Base token symbol not a string: {base_token}\n{common_explanation}" assert type(quote_token) == str, f"Quote token symbol not a string: {quote_token}\n{common_explanation}" if fee_tier is not None: assert (fee_tier >= 0) and (fee_tier <= 1), f"Received bad fee tier: {chain_id} {exchange_slug} {base_token} {quote_token}: {fee_tier}" if exchange_slug: exchange = exchange_universe.get_by_chain_and_slug(chain_id, exchange_slug) if exchange is None: # Try to produce very helpful error message if exchange_universe.get_exchange_count() == 1: our_exchange_slug = exchange_universe.get_single().exchange_slug exchange_message = f"The slug of the only exchange we have is {our_exchange_slug}." else: exchange_message = "" raise ExchangeNotFoundError(chain_id_name=chain_id.name, exchange_slug=exchange_slug, optional_extra_message=exchange_message) else: exchange = None try: pair = self.get_one_pair_from_pandas_universe( exchange.exchange_id if exchange_slug else None, base_token, quote_token, fee_tier=fee_tier, pick_by_highest_vol=True, ) # this check techinically unnecessary # since get_one_pair_from_pandas_universe will raise # but just to be sure except PairNotFoundError as e: raise PairNotFoundError(base_token=base_token, quote_token=quote_token, fee_tier=fee_tier, exchange_slug=exchange_slug, description=desc) from e return pair
[docs] def get_exchange_for_pair(self, pair: DEXPair) -> Exchange: """Get the exchange data on which a pair is trading. :param pair: Trading pair :return: Exchange instance. Should always return a value as traind pairs cannot exist without an exchange. """ assert self.exchange_universe, "PandasPairUniverse.exchange_universe must be set in order to use this function" return self.exchange_universe.get_by_id(pair.exchange_id)
[docs] def create_parquet_load_filter(self, count_limit=10_000) -> List[Tuple]: """Returns a Parquet loading filter that contains pairs in this universe. When candle or liquidity file is read to the memory, only read pairs that are within this pair universe. This severely reduces the memory usage and speed ups loading. See :py:func:`tradingstrategy.reader.read_parquet`. :param count_limit: Sanity check assert limit how many pairs we can cram into the filter. :return: Filter to be passed to read_table """ count = self.get_count() assert count < count_limit, f"Too many pairs to create a filter. Pair count is {count}" # https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html return [("pair_id", "in", self.get_all_pair_ids())]
[docs] @staticmethod def create_single_pair_universe( df: pd.DataFrame, exchange: Exchange, base_token_symbol: str, quote_token_symbol: str, pick_by_highest_vol=True, fee_tier: Optional[float]=None, ) -> "PandasPairUniverse": """Create a trading pair universe that contains only a single trading pair. .. warning:: Deprecated This is useful for trading strategies that to technical analysis trading on a single trading pair like BTC-USD. :param df: Unfiltered DataFrame for all pairs :param exchange: Exchange instance on the pair is trading :param base_token_symbol: Base token symbol of the trading pair :param quote_token_symbol: Quote token symbol of the trading pair :param pick_by_highest_vol: In the case of multiple match per token symbol, or scam tokens, pick one with the highest trade volume :param fee_tier: Pick a pair for a specific fee tier. Uniswap v3 has :raise DuplicatePair: Multiple pairs matching the criteria :raise PairNotFoundError: No pairs matching the criteria """ warnings.warn('This method is deprecated. Use PandasPairUniverse.create_pair_universe() instead', DeprecationWarning, stacklevel=2) return PandasPairUniverse.create_limited_pair_universe( df, exchange, [(base_token_symbol, quote_token_symbol)], pick_by_highest_vol, )
[docs] @staticmethod def create_limited_pair_universe( df: pd.DataFrame, exchange: Exchange, pairs: List[Tuple[str, str]], pick_by_highest_vol=True) -> "PandasPairUniverse": """Create a trading pair universe that contains only few trading pairs. .. warning:: Deprecated This is useful for trading strategies that to technical analysis trading on a few trading pairs, or single pair three-way trades like Cake-WBNB-BUSD. :param df: Unfiltered DataFrame for all pairs :param exchange: Exchange instance on the pair is trading :param pairs: List of trading pairs as ticket tuples. E.g. `[ ("WBNB, "BUSD"), ("Cake", "WBNB") ]` :param pick_by_highest_vol: In the case of multiple match per token symbol, or scam tokens, pick one with the highest trade volume :raise DuplicatePair: Multiple pairs matching the criteria :raise PairNotFoundError: No pairs matching the criteria """ warnings.warn('This method is deprecated. Use PandasPairUniverse.create_pair_universe() instead', DeprecationWarning, stacklevel=2) assert exchange is not None, "Got None as Exchange - exchange not found?" # https://pandas.pydata.org/docs/user_guide/merging.html frames = [] for base_token_symbol, quote_token_symbol in pairs: filtered_df: pd.DataFrame= df.loc[ (df["exchange_id"] == exchange.exchange_id) & (df["base_token_symbol"] == base_token_symbol) & (df["quote_token_symbol"] == quote_token_symbol)] if len(filtered_df) > 1: if not pick_by_highest_vol: duplicates = 0 for p in filtered_df.to_dict(orient="records"): logger.error("Conflicting pair: %s", p) duplicates += 1 raise DuplicatePair(f"Found {duplicates} trading pairs for {base_token_symbol}-{quote_token_symbol} when 1 was expected") # Sort by trade volume and pick the highest one sorted = filtered_df.sort_values(by="buy_volume_all_time", ascending=False) duplicates_removed_df = sorted.drop_duplicates(subset="base_token_symbol") frames.append(duplicates_removed_df) elif len(filtered_df) == 1: frames.append(filtered_df) else: raise PairNotFoundError(base_token=base_token_symbol, quote_token=quote_token_symbol, exchange_slug=exchange.exchange_slug) if exchange: exchange_universe = ExchangeUniverse.from_collection([exchange]) else: exchange_universe = None return PandasPairUniverse(pd.concat(frames), exchange_universe=exchange_universe)
[docs] @staticmethod def create_pair_universe( df: pd.DataFrame, pairs: Collection[HumanReadableTradingPairDescription], ) -> "PandasPairUniverse": """Create a PandasPairUniverse instance based on loaded raw pairs data. A shortcut method to create a pair universe for a single or few trading pairs, from DataFrame of all possible trading pairs. Example for a single pair: .. code-block:: python pairs_df = client.fetch_pair_universe().to_pandas() pair_universe = PandasPairUniverse.create_pair_universe( pairs_df, [(ChainId.polygon, "uniswap-v3", "WMATIC", "USDC", 0.0005)], ) assert pair_universe.get_count() == 1 pair = pair_universe.get_single() assert pair.base_token_symbol == "WMATIC" assert pair.quote_token_symbol == "USDC" assert pair.fee_tier == 0.0005 # BPS Example for multiple trading pairs.: .. code-block:: python pairs_df = client.fetch_pair_universe().to_pandas() # Create a trading pair universe for a single trading pair # # WMATIC-USD on Uniswap v3 on Polygon, 5 BPS fee tier and 30 BPS fee tier # pair_universe = PandasPairUniverse.create_pair_universe( pairs_df, [ (ChainId.polygon, "uniswap-v3", "WMATIC", "USDC", 0.0005), (ChainId.polygon, "uniswap-v3", "WMATIC", "USDC", 0.0030) ], ) assert pair_universe.get_count() == 2 :param df: Pandas DataFrame of all pair data. See :py:meth:`tradingstrategy.client.Client.fetch_pair_universe` for more information. :param pairs: List of pair human descriptions. **Warning**: If any of descriptions have fee set, all must have fee set, or there might be resolution issues. :return: A trading pair universe that contains only the listed trading pairs. """ resolved_pairs_df = resolve_pairs_based_on_ticker(df, pairs=pairs) assert len(resolved_pairs_df) == len(pairs), f"Not all pairs were resolved.\nAsked pairs: {pairs}\nResolved pairs:\n{resolved_pairs_df}" return PandasPairUniverse(resolved_pairs_df)
[docs]class LegacyPairUniverse: """The queries universe, as returned by the server. .. note :: TODO: Legacy prototype implementation and will be deprecated. Converts raw pair dataset to easier to use `DEXPair` in-memory index. You likely want to use :py:class:`PandasPairUniverse`, as its offers much more functionality than this implemetation. """ #: Internal id -> DEXPair mapping pairs: Dict[int, DEXPair]
[docs] def __init__(self, pairs: Dict[int, DEXPair]): self.pairs = pairs
[docs] @classmethod def create_from_pyarrow_table(cls, table: pa.Table) -> "LegacyPairUniverse": """Convert columnar presentation to a Python in-memory objects. Some data manipulation is easier with objects instead of columns. .. note :: This seems to quite slow operation. It is recommend you avoid this if you do not need row-like data. """ pairs: Dict[int, DEXPair] = {} for batch in table.to_batches(max_chunksize=5000): d = batch.to_pydict() for row in iterate_columnar_dicts(d): pairs[row["pair_id"]] = DEXPair.from_dict(row) return LegacyPairUniverse(pairs=pairs)
[docs] @classmethod def create_from_pyarrow_table_with_filters(cls, table: pa.Table, chain_id_filter: Optional[ChainId] = None) -> "LegacyPairUniverse": """Convert columnar presentation to a Python in-memory objects. Filter the pairs based on given filter arguments. """ if chain_id_filter: # https://stackoverflow.com/a/64579502/315168 chain_id_index = table.column('chain_id') row_mask = pc.equal(chain_id_index, pa.scalar(chain_id_filter.value, chain_id_index.cause)) selected_table = table.filter(row_mask) return LegacyPairUniverse.create_from_pyarrow_table(selected_table)
[docs] def get_pair_by_id(self, pair_id: int) -> Optional[DEXPair]: """Resolve pair by its id. Only useful for debugging. Does a slow look """ return self.pairs[pair_id]
[docs] def get_pair_by_ticker(self, base_token: str, quote_token: str) -> Optional[DEXPair]: """Get a trading pair by its ticker symbols. Note that this method works only very simple universes, as any given pair is poised to have multiple tokens and multiple trading pairs on different exchanges. :raise DuplicatePair: If the universe contains more than single entry for the pair. :raise PairNotFoundError: If the pair is not found. :return: None if there is no match """ pairs = [p for p in self.pairs.values() if p.base_token_symbol == base_token and p.quote_token_symbol == quote_token] if len(pairs) > 1: raise DuplicatePair(f"Multiple trading pairs found {base_token}-{quote_token}") if pairs: return pairs[0] raise PairNotFoundError(base_token=base_token, quote_token=quote_token)
[docs] def get_pair_by_ticker_by_exchange(self, exchange_id: int, base_token: str, quote_token: str) -> Optional[DEXPair]: """Get a trading pair by its ticker symbols. Note that this method works only very simple universes, as any given pair is poised to have multiple tokens and multiple trading pairs on different exchanges. :param exchange_id: E.g. `1` for uniswap_v2 :raise DuplicatePair: If the universe contains more than single entry for the pair. Because we are looking by a token symbol there might be fake tokens with the same symbol. :return: None if there is no match """ # Don't let ints slip through as they are unsupported assert type(exchange_id) == int pairs = [p for p in self.pairs.values() if p.base_token_symbol == base_token and p.quote_token_symbol == quote_token and p.exchange_id == exchange_id] if len(pairs) > 1: raise DuplicatePair(f"Multiple trading pairs found {base_token}-{quote_token} on exchange {exchange_id}") if pairs: return pairs[0] raise PairNotFoundError(base_token=base_token, quote_token=quote_token)
[docs] def get_all_pairs_on_exchange(self, exchange_id: int) -> Iterable[DEXPair]: """Get all trading pair on a decentralsied exchange. Use `ExchangeUniverse.get_by_chain_and_slug` to resolve the `exchange_id` first. :param chain_id: E.g. `ChainId.ethereum` :param exchange_id: E.g. `1` for uniswap_v2 :raise DuplicatePair: If the universe contains more than single entry for the pair. Because we are looking by a token symbol there might be fake tokens with the same symbol. :return: None if there is no match """ # Don't let ints slip through as they are unsupported assert type(exchange_id) == int for p in self.pairs.values(): if p.exchange_id == exchange_id: yield p
[docs] def get_active_pairs(self) -> Iterable["DEXPair"]: """Filter for pairs that have see a trade for the last 30 days""" return filter(lambda p: not p.flag_inactive, self.pairs.values())
[docs] def get_inactive_pairs(self) -> Iterable["DEXPair"]: """Filter for pairs that have not see a trade for the last 30 days""" return filter(lambda p: p.flag_inactive, self.pairs.values())
[docs]def resolve_pairs_based_on_ticker( df: pd.DataFrame, chain_id: Optional[ChainId] = None, exchange_slug: Optional[str] = None, pairs: set[tuple[ChainId, str, str, str] | \ tuple[ChainId, str, str, str, BasisPoint]] | \ Collection[HumanReadableTradingPairDescription] = None, sorting_criteria_by: Tuple = ("fee", "buy_volume_all_time"), sorting_criteria_ascending: Tuple = (True, False), ) -> pd.DataFrame: """Resolve symbolic trading pairs to their internal integer primary key ids. Uses pair database described :py:class:`DEXPair` Pandas dataframe to resolve pairs to their integer ids on a single exchange. .. warning :: For popular trading pairs, there will be multiple scam pairs with the same ticker name. In this case, one with the highest all-time buy volume is chosen. .. note :: Pair ids are not stable and may change long term. Always resolve pair ids before a run. Example: .. code-block: python client = persistent_test_client pairs_df = client.fetch_pair_universe().to_pandas() pairs = { (ChainId.ethereum, "uniswap-v3", "WETH", "USDC", 0.0005), (ChainId.ethereum, "uniswap-v3", "DAI", "USDC"), } filtered_pairs_df = resolve_pairs_based_on_ticker( pairs_df, pairs=pairs, ) assert len(filtered_pairs_df) == 2 Alternative Example: .. code-block: python pairs_df = client.fetch_pair_universe().to_pandas() tickers = { ("WBNB", "BUSD"), ("Cake", "WBNB"), } # ticker -> pd.Series row map for pairs filtered_pairs_df = resolve_pairs_based_on_ticker( pairs_df, ChainId.bsc, "pancakeswap-v2", tickers ) assert len(filtered_pairs_df) == 2 wbnb_busd = filtered_pairs_df.loc[ (filtered_pairs_df["base_token_symbol"] == "WBNB") & (filtered_pairs_df["quote_token_symbol"] == "BUSD") ].iloc[0] assert wbnb_busd["buy_volume_30d"] > 0 :param df: DataFrame containing DEXPairs :param chain_id: Blockchain the exchange is on. Set `None` if given part of `HumanReadableTradingPairDescription`. :param exchange_slug: Symbolic exchange name Set `None` if given part of `HumanReadableTradingPairDescription`. :param pairs: List of trading pairs as (base token, quote token) tuples. Note that giving trading pair tokens in wrong order causes pairs not to be found. If any ticker does not match it is not included in the result set. See :py:data:`tradingstrategy.pair.HumanReadableTradingPairDescription`. :param sorting_criteria_by: Resulting DataFrame sorting :param sorting_criteria_ascending: Resulting DataFrame sorting :return: DataFrame with filtered pairs. **Warning**: If you give a pair that is not found there is no error. Check that you get right amount of pairs out. """ assert pairs, "No pair_tickers given" match_fee = False # Create list of conditions to filter out dataframe, # one condition per pair conditions = [] for pair_description in pairs: if len(pair_description) in (4, 5): # New API pair_chain, pair_exchange, base, quote, *fee = pair_description assert isinstance(pair_chain, ChainId), f"Expected ChainId, got {pair_chain}. Description is {pair_description}." assert type(pair_exchange) == str assert type(base) == str assert type(quote) == str # Convert to BPS if len(fee) > 0: assert len(fee) == 1 fee_value = fee[0] assert type(fee_value) == float, f"Expected fee 0...1: {type(fee_value)}: {fee_value}" assert fee_value >= 0 and fee_value <= 1 fee = [int(fee_value * 10000)] else: pair_chain = chain_id pair_exchange = exchange_slug assert chain_id, "chain_id missing" assert exchange_slug, "exchange_slug missing" base, quote, *fee = pair_description condition = ( (df["base_token_symbol"].str.lower() == base.lower()) & (df["quote_token_symbol"].str.lower() == quote.lower()) & (df["exchange_slug"].str.lower() == pair_exchange.lower()) & (df["chain_id"] == pair_chain.value) ) # also filter by pair fee if pair ticker specifies it if len(fee) > 0: condition &= (df["fee"] == fee[0]) conditions.append(condition) # OR call conditions together # https://stackoverflow.com/a/57468610/315168 df_matches = df.loc[np.logical_or.reduce(conditions)] # Sort by the buy volume as a preparation # for the scam filter df_matches = df_matches.sort_values(by=list(sorting_criteria_by), ascending=list(sorting_criteria_ascending)) result_pair_ids = set() # Scam filter. # Pick the tokens by the highest buy volume to the result map. for pair_description in pairs: match_fee = None if len(pair_description) > 3: pair_chain, pair_exchange, base, quote, *fee = pair_description if len(fee) >= 1: match_fee = fee[0] else: # Legacy base, quote, *_ = pair_description if match_fee: for _, row in df_matches.iterrows(): if ( row["base_token_symbol"].lower() == base.lower() and row["quote_token_symbol"].lower() == quote.lower() and row["fee"] == match_fee * 10000 ): result_pair_ids.add(row["pair_id"]) break else: for _, row in df_matches.iterrows(): if ( row["base_token_symbol"].lower() == base.lower() and row["quote_token_symbol"].lower() == quote.lower() ): result_pair_ids.add(row["pair_id"]) break result_df = df.loc[df["pair_id"].isin(result_pair_ids)] return result_df
[docs]def generate_address_columns(df: pd.DataFrame) -> pd.DataFrame: """Add base_token_address, quote_token_address columns. These are not part of the dataset, as they can be derived from other colums. :param df: Dataframe from :py:meth:`tradingstrategy.client.Client.fetch_pair_universe`. :return: New DataFrame with `base_token_address` and `quote_token_address` columns. """ def expander(row: pd.Series) -> dict: quote_token_symbol = row["quote_token_symbol"] if row["token0_symbol"] == quote_token_symbol: return { "quote_token_address": row["token0_address"], "base_token_address": row["token1_address"], } else: return { "quote_token_address": row["token1_address"], "base_token_address": row["token0_address"], } applied_df = df.apply(expander, axis='columns', result_type='expand') df = pd.concat([df, applied_df], axis='columns') return df
def _preprocess_loaded_pair_data(data: dict) -> dict: """Fix any data loading and transfomration issues we might have with the data. Hot fix for https://github.com/tradingstrategy-ai/trading-strategy/issues/104 """ assert isinstance(data, dict) def _fix_val(v): try: if isnan(v): return None except: pass return v result = {} for k, v in data.items(): result[k] = _fix_val(v) return result def _convert_to_dex_pair(data: dict, exchange_universe: ExchangeUniverse | None=None) -> DEXPair: """Convert trading pair data from dict to a wrapped object. - Correctly handle serialisation quirks - Give user friendly error reports :param data: Raw DEX data as a dict :param exchange_universe: If given assign exchange labels on the trading pairs. :return: Constructed DEXPair instance """ assert data is not None, "Passed None as DEX data" exchange_name = None if exchange_universe is not None: exchange_id = data["exchange_id"] exchange = exchange_universe.get_by_id(exchange_id) if exchange is not None: exchange_name = exchange.exchange_slug data = _preprocess_loaded_pair_data(data) try: obj = DEXPair.from_dict(data) obj.exchange_name = exchange_name except Exception as e: pretty = pprint.pformat(data) raise DataDecodeFailed(f"Could not decode trading pair data:\n{pretty}") from e return obj