Source code for tradingstrategy.direct_feed.ohlcv_aggregate

"""OHLCV aggregation function for Pandas

from dataclasses import dataclass

import pandas as pd

from tradingstrategy.direct_feed.timeframe import Timeframe
from tradingstrategy.direct_feed.direct_feed_pair import PairId

[docs]@dataclass(slots=True, frozen=True) class OHLCVCandle: """One OHLCV candle in the direct data feed.""" pair: PairId timestamp: pd.Timestamp start_block: int end_block: int open: float high: float low: float close: float volume: float exchange_rate: float @staticmethod def get_dataframe_columns() -> dict: fields = dict([ ("pair", "string"), ("start_block", "uint64"), ("end_block", "uint64"), ("open", "float32"), ("high", "float32"), ("low", "float32"), ("close", "float32"), ("volume", "float32"), ("exchange_rate", "float32"), ]) return fields
[docs]def resample_trades_into_ohlcv( df: pd.DataFrame, timeframe: Timeframe, ) -> pd.DataFrame: """Resample incoming "tick" data. - The incoming DataFrame is not groupe by pairs yet, but presents stream of data from the blockchain. It must have columns `pair`, `amount`, `price`, `exchange_rate`. It will be indexed by `timestamp` for the operations. - There must be a price given externally for the first open - Build OHLCV dataframe from individual trades - Handle any exchange rate conversion - Any missing values will be filled with the last valid close - Exchange rate resample is the avg exchange rate of the candle. Thus, resampled data might not be accurately converted between native quote asset and US dollar value. Any currency conversion must happen before. ... and be timestamp indexed. :param df: DataFrame of incoming trades. Must have columns `price`, `amount` :param freq: Pandas frequency string for the new timeframe duration. E.g. `D` for daily. See :param offset: Allows you to "shift" candles :param conversion: Convert the incomign values if needed :return: OHLCV dataframes that are grouped by pair. Open price is not the price of the previous close, but the first trade done within the timeframe. """ assert len(df) > 0, "Empty dataframe" df = df.set_index("timestamp") df["abs_amount"] = df["amount"].abs() df = df.groupby("pair") # Calculate high, low and close values using naive # resamping. This assumes there were trades within the freq window. df2 = df["price"].resample(timeframe.freq).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last'}) # Retain timestamp as a column as well, # because resample() drops it df2["timestamp"] = df2.index.get_level_values(1) # TODO: Integers here get converted to floats when processed with agg(). # Figure out how to prevent this. blocks_df = df["block_number"].resample(timeframe.freq).agg({ 'end_block': 'max', 'start_block': 'min'}) volume_df = df["abs_amount"].resample(timeframe.freq).agg({ "total_volume": "sum", "avg_amount": "mean", }) trade_side_df = df["amount"].resample(timeframe.freq).agg({ 'buys': lambda x: (x > 0).sum(), 'sells': lambda x: (x < 0).sum()}) df2["exchange_rate"] = df["exchange_rate"].resample(timeframe.freq).mean() df2["start_block"] = blocks_df["start_block"] df2["end_block"] = blocks_df["end_block"] df2["volume"] = volume_df["total_volume"] df2["avg_trade"] = volume_df["avg_amount"] df2["buys"] = trade_side_df["buys"] df2["sells"] = trade_side_df["sells"] # Resample generates NaN values instead of sparse data. # In this point, we just drop the rows that have any NaNs in them # high low close exchange_rate start_block end_block # pair timestamp # AAVE-ETH 2020-01-01 80.0 80.0 80.0 1600.0 1.0 1.0 # 2020-01-02 96.0 96.0 96.0 1600.0 2.0 2.0 # ETH-USDC 2020-01-02 1600.0 1600.0 1600.0 1.0 2.0 2.0 # 2020-01-03 NaN NaN NaN NaN NaN NaN # 2020-01-04 NaN NaN NaN NaN NaN NaN # 2020-01-05 1620.0 1400.0 1400.0 1.0 7.0 8.0 df2 = df2.dropna() return df2
[docs]def get_feed_for_pair(df: pd.DataFrame, pair: PairId) -> pd.DataFrame: """Get candles for a single pair. :param df: An OHLCV dataframe. Generated with :py:func:`ohlcv_resample_trades` :param pair: Which pair we are interested in :return: Dataframe for this pair """ # groupby().resample() produces multi-index # where the first index is the pair # ipdb> df.index # MultiIndex([('AAVE-ETH', '2020-01-01'), # ('AAVE-ETH', '2020-01-02'), # ('ETH-USDC', '2020-01-02'), # ('ETH-USDC', '2020-01-05')], # names=['pair', 'timestamp']) # No data, return empty dataframe if len(df) == 0: return pd.DataFrame() # try: return df.xs(pair) except KeyError as e: raise KeyError(f"Could not find pair for address {pair}") from e
[docs]def truncate_ohlcv(df: pd.DataFrame, ts: pd.Timestamp) -> pd.DataFrame: """Clear the chain tip data to be written again. :param ts: Drop everything after this (inclusive). """ if len(df) == 0: return df df = df[df.index.get_level_values('timestamp') < ts] return df