Source code for tradingstrategy.direct_feed.store
"""Cached data store for trade feeds.
We store
- Block headers
- Fetched trades
We do not store
- Candles (always regenerated)
"""
import logging
import os
import shutil
from pathlib import Path
from typing import Tuple
from eth_defi.event_reader.block_header import BlockHeader
from eth_defi.event_reader.parquet_block_data_store import ParquetDatasetBlockDataStore
from tradingstrategy.direct_feed.trade_feed import TradeFeed
logger = logging.getLogger(__name__)
[docs]class DirectFeedStore:
"""Manage on-disk block header and trade cache for direct feeds.
Internally uses partitioned Parquet dataset storage.
Each partition is a range of blocks and goes to different folder/file.
"""
[docs] def __init__(self, base_path: Path, partition_size: int):
"""Initialise a new store.
:param base_path:
Base folder where data is dumped.
Both headers and trades get their own Parquet datasets
as folders.
:param partition_size:
Partition size for the store.
Expressed as number of blocks per parquet file.
"""
assert isinstance(base_path, Path)
assert type(partition_size) == int
self.base_path = base_path
self.partition_size = partition_size
[docs] def is_empty(self) -> bool:
"""Have we written anything to this store yer."""
return not self.base_path.exists()
[docs] def clear(self):
"""Clear cache."""
assert not self.is_empty(), f"Cannot clear empty store."
if self.base_path.exists():
shutil.rmtree(self.base_path)
[docs] def save_trade_feed(self, trade_feed: TradeFeed) -> Tuple[int, int]:
"""Save the trade and block header data.
:param trade_feed:
Save trades and block headers from this feed.
:return:
Last saved header block number, last saved trade number
"""
base_path = self.base_path
partition_size = self.partition_size
header_store = ParquetDatasetBlockDataStore(Path(base_path).joinpath("blocks"), partition_size)
trade_store = ParquetDatasetBlockDataStore(Path(base_path).joinpath("trades"), partition_size)
# Save headers
headers_df = trade_feed.reorg_mon.to_pandas(partition_size)
if len(headers_df) > 0:
header_store.save(headers_df)
assert not header_store.is_virgin(), f"Headers not correctly written"
last_header_block = headers_df.iloc[-1]["block_number"]
else:
last_header_block = 0
# Save trades
trades_df = trade_feed.to_pandas(partition_size)
if len(trades_df) > 0:
trade_store.save(trades_df, check_contains_all_blocks=False)
assert not trade_store.is_virgin(), f"Trades not correctly written"
last_trade_block = trades_df.iloc[-1]["block_number"]
else:
last_trade_block = 0
return last_header_block, last_trade_block
[docs] def load_trade_feed(self, trade_feed: TradeFeed) -> bool:
"""Load trade and block header data.
:param trade_feed:
Save trades and block headers from this feed.
:return:
True if any data was loaded.
"""
base_path = self.base_path
partition_size = self.partition_size
header_store = ParquetDatasetBlockDataStore(Path(base_path).joinpath("blocks"), partition_size)
trade_store = ParquetDatasetBlockDataStore(Path(base_path).joinpath("trades"), partition_size)
if not header_store.is_virgin():
logger.info("Loading block header data from %s", header_store.path)
headers_df_2 = header_store.load()
block_map = BlockHeader.from_pandas(headers_df_2)
trade_feed.reorg_mon.restore(block_map)
logger.info("Loaded %d blocks", len(block_map))
if not trade_store.is_virgin():
trades_df_2 = trade_store.load()
trade_feed.restore(trades_df_2)
logger.info(f"Loaded {len(trades_df_2)}, last block is {trade_feed.get_block_number_of_last_trade():,}")
return not(header_store.is_virgin() or trade_store.is_virgin())