Source code for tradingstrategy.direct_feed.candle_feed

from dataclasses import dataclass
from typing import List, Iterable

import pandas as pd

from tradingstrategy.direct_feed.ohlcv_aggregate import resample_trades_into_ohlcv, get_feed_for_pair, truncate_ohlcv
from tradingstrategy.direct_feed.timeframe import Timeframe
from tradingstrategy.direct_feed.trade_feed import TradeDelta
from tradingstrategy.direct_feed.direct_feed_pair import PairId

[docs]class CandleFeed: """Create candles for certain time frame for multiple pairs. - Takes :py:class:`TradeFeed` as input - Generates candles based on this feed - Can only generate candles of one timeframe - May contain multiple pairs in one candle feed """
[docs] def __init__(self, pairs: List[PairId], timeframe: Timeframe, ): """ :param pairs: List of pairs this address contains. Symbolic names or addresses. :param freq: Pandas frequency string e.g. "1H", "min" :param candle_offset: """ for p in pairs: assert type(p) == str, f"Pairs must be a list of pair ids (str). Got: {p}" self.pairs = pairs self.timeframe = timeframe self.candle_df = pd.DataFrame() self.last_cycle = 0
def __repr__(self): if len(self.pairs) == 1: name = f"CandleFeed for {self.pairs[0]}" else: name = f"CandleFeed for {len(self.pairs)} pairs" if len(self.candle_df) > 0: first_ts = self.candle_df.iloc[0]["timestamp"] last_ts = self.candle_df.iloc[-1]["timestamp"] else: first_ts = last_ts = "-" candle_count = len(self.candle_df) return f"<{name} using timeframe {self.timeframe.freq}, having data {first_ts} - {last_ts} total {candle_count:,} candles>"
[docs] def apply_delta(self, delta: TradeDelta, initial_load=False, label_candles=True): """Add new candle data generated from the latest blockchain input. :param delta: New trades coming in :param initial_load: This is not an incremental snapshot, but initial buffer fill. Ignore `delta.start_ts` and fill the candle buffer as long as we get data. :param label_candles: Create and update label column. Label column contains tooltips for the visual candle viewer. This must be done before candle data is grouped by pairs. """ if len(delta.trades) > 0: cropped_df = truncate_ohlcv(self.candle_df, delta.start_ts) candles = resample_trades_into_ohlcv(delta.trades, self.timeframe) # Only if we have any new candles from our timeframe add them to the # in-memory buffer if len(candles) > 0: self.candle_df = pd.concat([cropped_df, candles]) self.last_cycle = delta.cycle
def get_candles_by_pair(self, pair: PairId) -> pd.DataFrame: return get_feed_for_pair(self.candle_df, pair)
[docs] def get_last_block_number(self) -> int: """Get overall last block number for which we have valid data. :return: block number (inclusive) """ return self.candle_df["end_block"].max()
[docs] def iterate_pairs(self) -> Iterable[pd.DataFrame]: """Get candles for all pairs we are tracking.""" for p in self.pairs: yield self.get_candles_by_pair(p)
[docs]def prepare_raw_candle_data(df: pd.DataFrame) -> pd.DataFrame: """Convert all Python Decimal objects to easier to deal floats in DataFrame.""" return df.astype({ "open": "float32", "close": "float32", "high": "float32", "low": "float32", "volume": "float32", })