Client#

API documentation for tradingstrategy.client.Client Python class in Trading Strategy framework.

class Client[source]#

Bases: BaseClient

An API client for querying the Trading Strategy datasets from a server.

  • The client will download datasets.

  • In-built disk cache is offered, so that large datasets are not redownloaded unnecessarily.

  • There is protection against network errors: dataset downloads are retries in the case of data corruption errors.

  • Nice download progress bar will be displayed (when possible)

You can Client either in

Python application usage:

import os

trading_strategy_api_key = os.environ["TRADING_STRATEGY_API_KEY"]
client = Client.create_live_client(api_key)
exchanges = client.fetch_exchange_universe()
print(f"Dataset contains {len(exchange_universe.exchanges)} exchanges")
__init__(env, transport)[source]#

Do not call constructor directly, but use one of create methods.

Parameters:

Methods

__init__(env, transport)

Do not call constructor directly, but use one of create methods.

clear_caches([filename])

Remove any cached data.

close()

Close the streams of underlying transport.

create_jupyter_client([cache_path, api_key, ...])

Create a new API client.

create_live_client([api_key, cache_path, ...])

Create a live trading instance of the client.

create_pyodide_client_async([cache_path, ...])

Create a new API client inside Pyodide enviroment.

create_test_client([cache_path])

Create a new Trading Strategy client to be used with automated test suites.

fetch_all_candles(bucket)

Get cached blob of candle data of a certain candle width.

fetch_all_liquidity_samples(bucket)

Get cached blob of liquidity events of a certain time window.

fetch_candle_dataset(bucket)

Fetch candle data from the server.

fetch_candles_by_pair_ids(pair_ids, bucket)

Fetch candles for particular trading pairs.

fetch_chain_status(chain_id)

Get live information about how a certain blockchain indexing and candle creation is doing.

fetch_clmm_liquidity_provision_candles_by_pair_ids(...)

Fetch CLMM liquidity provision candles.

fetch_exchange_universe()

Fetch list of all exchanges form the dataset server.

fetch_lending_candles_by_reserve_id(...[, ...])

Fetch lending candles for a particular reserve.

fetch_lending_candles_for_universe(...[, ...])

Load lending reservers for several assets as once.

fetch_lending_reserve_universe()

Load a cache the lending reserve universe.

fetch_lending_reserves_all_time()

Get a cached blob of lending protocol reserve events and precomupted stats.

fetch_pair_universe()

Fetch pair universe from local cache or the candle server.

fetch_top_pairs(chain_ids[, exchange_slugs, ...])

Get new trading pairs to be included in the trading universe.

fetch_trading_data_availability(pair_ids, bucket)

Check the trading data availability at oracle's real time market feed endpoint.

fetch_tvl_by_pair_ids(pair_ids, bucket[, ...])

Fetch TVL/liquidity candles for particular trading pairs.

preflight_check()

Checks that everything is in ok to run the notebook

setup_notebook()

Legacy.

__init__(env, transport)[source]#

Do not call constructor directly, but use one of create methods.

Parameters:
close()[source]#

Close the streams of underlying transport.

clear_caches(filename=None)[source]#

Remove any cached data.

Cache is specific to the current transport.

Parameters:

filename (Optional[Union[str, Path]]) – If given, remove only that specific file, otherwise clear all cached data.

fetch_pair_universe()[source]#

Fetch pair universe from local cache or the candle server.

The compressed file size is around 5 megabytes.

If the download seems to be corrupted, it will be attempted 3 times.

Return type:

Table

fetch_exchange_universe()[source]#

Fetch list of all exchanges form the dataset server.

Return type:

ExchangeUniverse

fetch_all_candles(bucket)[source]#

Get cached blob of candle data of a certain candle width.

The returned data can be between several hundreds of megabytes to several gigabytes and is cached locally.

The returned data is saved in PyArrow Parquet format.

For more information see tradingstrategy.candle.Candle.

If the download seems to be corrupted, it will be attempted 3 times.

Parameters:

bucket (TimeBucket) –

Return type:

Table

fetch_candles_by_pair_ids(pair_ids, bucket, start_time=None, end_time=None, max_bytes=None, progress_bar_description=None)[source]#

Fetch candles for particular trading pairs.

This is right API to use if you want data only for a single or few trading pairs. If the number of trading pair is small, this download is much more lightweight than Parquet dataset download.

The fetch is performed using JSONL API endpoint. This endpoint always returns real-time information.

Parameters:
  • pair_ids (Collection[int]) – Trading pairs internal ids we query data for. Get internal ids from pair dataset.

  • time_bucket – Candle time frame

  • start_time (Optional[Union[datetime, Timestamp]]) – All candles after this. If not given start from genesis.

  • end_time (Optional[Union[datetime, Timestamp]]) – All candles before this

  • max_bytes (Optional[int]) – Limit the streaming response size

  • progress_bar_description (Optional[str]) – Display on download progress bar.

  • bucket (TimeBucket) –

Returns:

Candles dataframe

Raises:

tradingstrategy.transport.jsonl.JSONLMaxResponseSizeExceeded – If the max_bytes limit is breached

Return type:

DataFrame

fetch_tvl_by_pair_ids(pair_ids, bucket, start_time=None, end_time=None, progress_bar_description=None, query_type=OHLCVCandleType.tvl_v1)[source]#

Fetch TVL/liquidity candles for particular trading pairs.

This is right API to use if you want data only for a single or few trading pairs. If the number of trading pair is small, this download is much more lightweight than Parquet dataset download.

TODO: Upgrade default query_type from tvl_v1 to tvl_v2. Lots of backwards breaking changes.

The returned TVL/liquidity data is converted to US dollars by the server.

Note

TVL data is an estimation. Malicious tokens are known to manipulate their TVL/liquidity/market depth, and it is not possible to detect and eliminate all manipulations.

Example:

exchange_universe = client.fetch_exchange_universe()
pairs_df = client.fetch_pair_universe().to_pandas()

pair_universe = PandasPairUniverse(
    pairs_df,
    exchange_universe=exchange_universe,
)

pair = pair_universe.get_pair_by_human_description(
    (ChainId.ethereum, "uniswap-v3", "WETH", "USDC", 0.0005)
)

pair_2 = pair_universe.get_pair_by_human_description(
    (ChainId.ethereum, "uniswap-v2", "WETH", "USDC")
)

start = datetime.datetime(2024, 1, 1)
end = datetime.datetime(2024, 2, 1)

liquidity_df = client.fetch_tvl_by_pair_ids(
    [pair.pair_id, pair_2.pair_id],
    TimeBucket.d1,
    start_time=start,
    end_time=end,
)
Parameters:
  • pair_ids (Collection[int]) – Trading pairs internal ids we query data for. Get internal ids from pair dataset.

  • bucket (TimeBucket) –

    Candle time frame.

    Ask TimeBucker.d1 or higher. TVL data may not be indexed for for lower timeframes.

  • start_time (Optional[Union[datetime, Timestamp]]) – All candles after this. If not given start from genesis.

  • end_time (Optional[Union[datetime, Timestamp]]) – All candles before this

  • progress_bar_description (Optional[str]) – Display a download progress bar using tqdm_loggable if given.

  • query_type (OHLCVCandleType) –

Returns:

TVL dataframe.

Has columns “open”, “high”, “low”, “close”, “pair_id” presenting TVL at the different points of time. The index is DateTimeIndex.

This data is not forward filled.

Return type:

DataFrame

fetch_clmm_liquidity_provision_candles_by_pair_ids(pair_ids, bucket, start_time=None, end_time=None, progress_bar_description='Downloading CLMM data')[source]#

Fetch CLMM liquidity provision candles.

Get Uniswap v3 liquidity provision data for liquidity provider position backtesting.

  • Designed to be used with Demeter backtesting framework but works with others.

  • For the candles format see tradingstrategy.clmm.

  • Responses are cached on the local file system

Example:

import datetime
from tradingstrategy.pair import PandasPairUniverse
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.chain import ChainId


class DemeterParameters:
    pair_descriptions = [
        (ChainId.arbitrum, "uniswap-v3", "WETH", "USDC", 0.0005)
    ]
    start = datetime.datetime(2024, 1, 1)
    end = datetime.datetime(2024, 2, 1)
    time_bucket = TimeBucket.m1
    initial_cash = 10_000  # USDC
    initial_base_token = 1  # WETH

# Load data needed to resolve pair human descriptions to internal ids
exchange_universe = client.fetch_exchange_universe()
pairs_df = client.fetch_pair_universe().to_pandas()
pair_universe = PandasPairUniverse(
    pairs_df,
    exchange_universe=exchange_universe,
)

# Load metadata for the chosen trading pairs (pools)
pair_metadata = [pair_universe.get_pair_by_human_description(desc) for desc in DemeterParameters.pair_descriptions]

# Map to internal pair primary keys
pair_ids = [pm.pair_id for pm in pair_metadata]

print("Pool addresses are", [(pm.get_ticker(), pm.pair_id, pm.address) for pm in pair_metadata])

# Load CLMM data for selected pairs
clmm_df = client.fetch_clmm_liquidity_provision_candles_by_pair_ids(
    pair_ids,
    DemeterParameters.time_bucket,
    start_time=DemeterParameters.start,
    end_time=DemeterParameters.end,
)

print("CLMM data sample is")
display(clmm_df.head(10))
Parameters:
  • pair_ids (Collection[int]) –

    Trading pairs internal ids we query data for. Get internal ids from pair dataset.

    Only works with Uniswap v3 pairs.

  • bucket (TimeBucket) –

    Candle time frame.

    Ask TimeBucket.d1 or lower. TimeBucket.m1 is most useful for LP backtesting.

  • start_time (Optional[Union[datetime, Timestamp]]) –

    All candles after this.

    Inclusive.

  • end_time (Optional[Union[datetime, Timestamp]]) –

    All candles before this.

    Inclusive.

  • progress_bar_description (Optional[str]) –

    Display a download progress bar using tqdm_loggable if given.

    Set to None to disable.

Returns:

CLMM dataframe.

See tradingstrategy.clmm for details.

Return type:

DataFrame

fetch_trading_data_availability(pair_ids, bucket)[source]#

Check the trading data availability at oracle’s real time market feed endpoint.

  • Trading Strategy oracle uses sparse data format where candles with zero trades are not generated. This is better suited for illiquid DEX markets with few trades.

  • Because of sparse data format, we do not know if there is a last candle available - candle may not be available yet or there might not be trades to generate a candle

This endpoint allows to check the trading data availability for multiple of trading pairs.

Example:

exchange_universe = client.fetch_exchange_universe()
pairs_df = client.fetch_pair_universe().to_pandas()

# Create filtered exchange and pair data
exchange = exchange_universe.get_by_chain_and_slug(ChainId.bsc, "pancakeswap-v2")
pair_universe = PandasPairUniverse.create_pair_universe(
        pairs_df,
        [(exchange.chain_id, exchange.exchange_slug, "WBNB", "BUSD")]
    )

pair = pair_universe.get_single()

# Get the latest candle availability for BNB-BUSD pair
pairs_availability = client.fetch_trading_data_availability({pair.pair_id}, TimeBucket.m15)
Parameters:
  • pair_ids (Collection[int]) – Trading pairs internal ids we query data for. Get internal ids from pair dataset.

  • time_bucket – Candle time frame

  • bucket (TimeBucket) –

Returns:

Map of pairs -> their trading data availability

Return type:

Dict[int, TradingPairDataAvailability]

fetch_candle_dataset(bucket)[source]#

Fetch candle data from the server.

Do not attempt to decode the Parquet file to the memory, but instead of return raw

Parameters:

bucket (TimeBucket) –

Return type:

Path

fetch_lending_candles_by_reserve_id(reserve_id, bucket, candle_type=LendingCandleType.variable_borrow_apr, start_time=None, end_time=None)[source]#

Fetch lending candles for a particular reserve.

Parameters:
  • reserve_id (int) – Lending reserve’s internal id we query data for. Get internal id from lending reserve universe dataset.

  • bucket (TimeBucket) – Candle time frame.

  • candle_type (LendingCandleType) – Lending candle type.

  • start_time (Optional[datetime]) – All candles after this. If not given start from genesis.

  • end_time (Optional[datetime]) – All candles before this

Returns:

Lending candles dataframe

Return type:

DataFrame

fetch_lending_candles_for_universe(lending_reserve_universe, bucket, candle_types=(<LendingCandleType.variable_borrow_apr: 'variable_borrow_apr'>, <LendingCandleType.supply_apr: 'supply_apr'>), start_time=None, end_time=None, construct_timestamp_column=True, progress_bar_description=None)[source]#

Load lending reservers for several assets as once.

  • Display a progress bar during download

  • For usage examples see tradingstrategy.lending.LendingCandleUniverse.

Note

This download method is still upoptimised due to small number of reserves

Parameters:
  • candle_types (Collection[LendingCandleType]) – Data for candle types to load

  • construct_timestamp_column

    After loading data, create “timestamp” series based on the index.

    We need to convert index to column if we are going to have several reserves in tradingstrategy.lending.LendingCandleUniverse.

  • progress_bar_description (str | None) – Override the default progress bar description.

  • lending_reserve_universe (LendingReserveUniverse) –

  • bucket (TimeBucket) –

  • start_time (datetime.datetime | pandas._libs.tslibs.timestamps.Timestamp) –

  • end_time (datetime.datetime | pandas._libs.tslibs.timestamps.Timestamp) –

Returns:

Dictionary of dataframes.

One DataFrame per candle type we asked for.

Return type:

Dict[LendingCandleType, DataFrame]

fetch_all_liquidity_samples(bucket)[source]#

Get cached blob of liquidity events of a certain time window.

The returned data can be between several hundreds of megabytes to several gigabytes and is cached locally.

The returned data is saved in PyArrow Parquet format.

For more information see tradingstrategy.liquidity.XYLiquidity.

If the download seems to be corrupted, it will be attempted 3 times.

Parameters:

bucket (TimeBucket) –

Return type:

Table

fetch_lending_reserve_universe()[source]#

Load a cache the lending reserve universe.

Return type:

LendingReserveUniverse

fetch_lending_reserves_all_time()[source]#

Get a cached blob of lending protocol reserve events and precomupted stats.

The returned data can be between several hundreds of megabytes to several gigabytes in size, and is cached locally.

Note that at present the only available data is for the AAVE v3 lending protocol.

The returned data is saved in a PyArrow Parquet format.

If the download seems to be corrupted, it will be attempted 3 times.

Return type:

Table

fetch_chain_status(chain_id)[source]#

Get live information about how a certain blockchain indexing and candle creation is doing.

Parameters:

chain_id (ChainId) –

Return type:

dict

fetch_top_pairs(chain_ids, exchange_slugs=None, addresses=None, limit=None, method=TopPairMethod.sorted_by_liquidity_with_filtering, min_volume_24h_usd=1000, risk_score_threshold=65)[source]#

Get new trading pairs to be included in the trading universe.

This API is still under heavy development.

This endpoint is designed to scan new trading pairs to be included in a trading universe. It ranks and filters the daily/weekly/etc. interesting trading pairs by a criteria.

  • Top pairs on exchanges

  • Top pairs for given tokens, by a token address

The result will include - Included and excluded trading pairs - Pair metadata - Latest volume and liquidity - Token tax information - TokenSniffer risk score

The result data is asynchronously filled, and may not return the most fresh situation, due to data processing delays. So when you call this method 24:00 it does not have pairs for yesterday ready yet. The results may vary, but should reflect the look back of last 24h.

Various heuristics is applied to the result filtering, like excluding stable pairs, derivative tokens, choosing the trading pair with the best fee, etc.

When you store the result, you need to use tuple (chain id, pool address) as the persistent key. Any integer primary keys may change over long term.

Warning

Depending on the TokenSniffer data available, this endpoint may take up to 15 seconds per token.

The endpoint has two modes of operation

  • TopPairMethod.sorted_by_liquidity_with_filtering: Give the endpoint a list of exchange slugs and get the best trading pairs on these exchanges. You need to give ``chain_id`, limit` and exchange_slugs arguments.

  • TopPairMethod.by_addresses: Give the endpoint a list of token smart contract addresses and get the best trading pairs for these. You need to give ``chain_id` and addresses arguments.

Example how to get token tax data and the best trading pair for given Ethereum tokens:

top_reply = client.fetch_top_pairs(
    chain_ids={ChainId.ethereum},
    addresses={
        "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9",  # COMP
        "0xc00e94Cb662C3520282E6f5717214004A7f26888"  # AAVE
    },
    method=TopPairMethod.by_token_addresses,
    limit=None,
)

assert isinstance(top_reply, TopPairsReply)
# The top picks will be COMP/WETH and AAVE/WETH based on volume/liquidity
assert len(top_reply.included) == 2
# There are many pairs excluded e.g AAVE/USDC and AAVE/USDT) based ones because of low liq/vol
assert len(top_reply.excluded) > 0

comp_weth = top_reply.included[0]
assert comp_weth.base_token == "COMP"
assert comp_weth.quote_token == "WETH"
assert comp_weth.get_buy_tax() == 0
assert comp_weth.get_sell_tax() == 0
assert comp_weth.volume_24h_usd > 100.0
assert comp_weth.tvl_latest_usd > 100.0

Example of chain/exchange based query:

# Get top tokens of Uniswap on Ethereum
top_reply = client.fetch_top_pairs(
    chain_ids={ChainId.ethereum},
    exchange_slugs={"uniswap-v2", "uniswap-v3"},
    limit=10,
)

assert isinstance(top_reply, TopPairsReply)
assert len(top_reply.included) == 10
assert len(top_reply.excluded) > 0  # There is always something to be excluded
Parameters:
  • method (TopPairMethod) – Currently, hardcoded. No other methods supported.

  • chain_ids (Collection[ChainId]) –

    List of blockchains to consider.

    Currently only 1 chain_id supported per query.

  • exchange_slugs (Optional[Collection[str]]) – List of DEXes to consider.

  • addresses (Optional[Collection[str]]) –

    List of token addresses to query.

    Token addresses, not* trading pair addresses.

    The list is designed for base tokens in a trading pair. The list should not include any quote tokens like WETH or USDC because the resulting trading pair list is too long to handle, and the server will limit the list at some point.

  • limit (None) –

    Max number of results.

    If you ask very high number of tokens / pairs, the server will hard limit the response in some point. In this case, you may not get a resulting trading pair for a token even if such exists. Try to ask max 100 tokens at once.

  • min_volume_24h_usd (float | None) –

    Exclude trading pairs that do not reach this volume target.

    The filtered pairs do not appear in the result at all (not worth to load from the database) or will appear in excluded category.

    Default to $1000. Minimum value is $1.

  • risk_score_threshold

    TokenSniffer risk score threshold.

    If the TokenSniffer risk score is below this value, the token will be in excluded category of the reply, otherwise it is included.

Returns:

Top trading pairs included and excluded in the ranking.

If by_addresses method is used and there is no active trading data for the token, the token may not appear in neither included or excluded results.

Return type:

TopPairsReply

classmethod preflight_check()[source]#

Checks that everything is in ok to run the notebook

classmethod setup_notebook()[source]#

Legacy.

async classmethod create_pyodide_client_async(cache_path=None, api_key='secret-token:tradingstrategy-d15c94d954abf9d98847f88d54403720ce52e41f267f5aaf16e63fcd30256af0', remember_key=False)[source]#

Create a new API client inside Pyodide enviroment.

More information about Pyodide project / running Python in a browser.

Parameters:
  • cache_path (Optional[str]) – Virtual file system path

  • cache_api_key – The API key used with the server downloads. A special hardcoded API key is used to identify Pyodide client and its XmlHttpRequests. A referral check for these requests is performed.

  • remember_key – Store the API key in IndexDB for the future use

  • api_key (Optional[str]) –

Returns:

pass

Return type:

Client

classmethod create_jupyter_client(cache_path=None, api_key=None, pyodide=None, settings_path=PosixPath('/home/runner/.tradingstrategy'))[source]#

Create a new API client.

This function is intended to be used from Jupyter notebooks

Note

Only use within Jupyter Notebook environments. Otherwise use create_live_client().

  • Any local or server-side IPython session

  • JupyterLite notebooks

Parameters:
  • api_key (Optional[str]) – If not given, do an interactive API key set up in the Jupyter notebook while it is being run.

  • cache_path (Optional[str]) – Where downloaded datasets are stored. Defaults to ~/.cache.

  • pyodide – Detect the use of this library inside Pyodide / JupyterLite. If None then autodetect Pyodide presence, otherwise can be forced with True.

  • settings_path

    Where do we write our settings file.

    Set None to disable settings file in Docker/web browser environments.

Return type:

Client

classmethod create_test_client(cache_path=None)[source]#

Create a new Trading Strategy client to be used with automated test suites.

Reads the API key from the environment variable TRADING_STRATEGY_API_KEY. A temporary folder is used as a cache path.

By default, the test client caches data under /tmp folder. Tests do not clear this folder between test runs, to make tests faster.

Return type:

Client

classmethod create_live_client(api_key=None, cache_path=None, settings_path=PosixPath('/home/runner/.tradingstrategy'))[source]#

Create a live trading instance of the client.

  • The live client is non-interactive and logs using Python logger

  • If you want to run inside notebook, use create_jupyter_client() instead

Example:

from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.pair import PandasPairUniverse
from tradingstrategy.timebucket import TimeBucket

# Disable the settings file.
# API key must be given in an environment variable.
client = Client.create_live_client(
    settings_path=None,
    api_key=os.environ["TRADING_STRATEGY_API_KEY"],
)
# Load pairs in all exchange
exchange_universe = client.fetch_exchange_universe()
pairs_df = client.fetch_pair_universe().to_pandas()

pair_universe = PandasPairUniverse(pairs_df, exchange_universe=exchange_universe)

pair_ids = [
    pair_universe.get_pair_by_human_description([ChainId.ethereum, "uniswap-v3", "WETH", "USDC", 0.0005]).pair_id,
]

start = pd.Timestamp.utcnow() - pd.Timedelta("3d")
end = pd.Timestamp.utcnow()

# Download some data
clmm_df = client.fetch_clmm_liquidity_provision_candles_by_pair_ids(
    pair_ids,
    TimeBucket.d1,
    start_time=start,
    end_time=end,
)
Parameters:
  • api_key (Optional[str]) – Trading Strategy oracle API key, starts with secret-token:tradingstrategy-…

  • cache_path (Optional[Path]) – Where downloaded datasets are stored. Defaults to ~/.cache.

  • settings_path (pathlib.Path | None) –

    Where do we write our settings file.

    Set None to disable settings file in Docker environments.

Return type:

Client