"""Decision trigger.
- Wait for the latest candle data become available to act on it immediately
- Update :py:class:`TradingStrategyUniverse` with the latest data needed for the current strategy cycle
"""
import datetime
import logging
import time
from dataclasses import dataclass
from typing import Set, Optional, Dict
import pandas as pd
from tradingstrategy.candle import GroupedCandleUniverse, TradingPairDataAvailability
from tradingstrategy.client import Client
from tradingstrategy.lending import LendingReserve, LendingReserveUniverse, LendingCandleType, LendingCandleResult, LendingCandleUniverse
from tradingstrategy.pair import DEXPair
from tradingstrategy.timebucket import TimeBucket
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradingstrategy.types import PrimaryKey
logger = logging.getLogger(__name__)
class NoNewDataReceived(Exception):
"""We never got new data despite max wait."""
class BadNewDataReceived(Exception):
"""Candles are off."""
[docs]@dataclass(slots=True)
class UpdatedUniverseResult:
"""Describe the result of universe waiting operation."""
#: Trading Universe with updated candles
updated_universe: TradingStrategyUniverse
#: When we finished waiting
ready_at: datetime.datetime
#: How long we waited
time_waited: datetime.timedelta
#: How many cycles we did waiting
poll_cycles: int
#: Maximum difference between timestamp and last available candle.
#:
#: None if there was no poll cycles
max_diff: Optional[datetime.datetime]
[docs]def fetch_price_data(
client: Client,
bucket: TimeBucket,
timestamp: datetime.datetime,
pairs: Set[DEXPair],
required_history_period: datetime.timedelta,
) -> pd.DataFrame:
"""Download the pair data.
TODO: Add an API to disable progress bars.
:param client:
:param bucket:
:param timestamp:
:param pairs:
:param required_history_period:
:return:
A candle containing a mix of pair data for all pairs.
"""
pair_ids = {p.pair_id for p in pairs}
start_time = timestamp - required_history_period - datetime.timedelta(seconds=1)
return client.fetch_candles_by_pair_ids(
pair_ids,
bucket=bucket,
start_time=start_time,
end_time=timestamp,
)
[docs]def fetch_lending_data(
client: Client,
bucket: TimeBucket,
timestamp: datetime.datetime,
lending_reserve_universe: LendingReserveUniverse,
required_history_period: datetime.timedelta,
) -> LendingCandleResult:
"""Download the new lending candles.
:param client:
:param bucket:
:param timestamp:
:param pairs:
:param required_history_period:
:return:
DataFrame containing updated lending reserves.
"""
start_time = timestamp - required_history_period - datetime.timedelta(seconds=1)
return client.fetch_lending_candles_for_universe(
lending_reserve_universe,
bucket=bucket,
start_time=start_time,
end_time=timestamp,
)
[docs]def fetch_availability(
client: Client,
bucket: TimeBucket,
pairs: Set[DEXPair],
) -> Dict[PrimaryKey, TradingPairDataAvailability]:
"""Fetch the trading data availability from the oracle.
:return:
A candle containing a mix of pair data for all pairs.
"""
pair_ids = {p.pair_id for p in pairs}
return client.fetch_trading_data_availability(
pair_ids,
bucket=bucket,
)
[docs]def update_universe(
universe: TradingStrategyUniverse,
price_df: pd.DataFrame,
lending_candles: LendingCandleResult | None = None,
) -> TradingStrategyUniverse:
"""Update a Trading Universe with a new candle data.
:param price_df:
Unsorted DataFrame containing data for all trading pairs we are interested in.
:param lending_candles:
New lending candles fetches from the server
"""
updated_universe = universe.clone()
updated_universe.data_universe.candles = GroupedCandleUniverse(price_df)
if universe.has_lending_data():
assert lending_candles is not None
updated_universe.data_universe.lending_candles = LendingCandleUniverse(
lending_candles,
lending_reserve_universe=universe.data_universe.lending_reserves
)
return updated_universe
[docs]def validate_latest_candles(
pairs: Set[DEXPair],
df: pd.DataFrame,
timestamp: datetime.datetime,
):
"""Ensure that the oracle served us correct up-to-date candles.
- The last timestamp of a pair must match
what we requested earlier.
- The timestamp cannot be sooner or later
.. note ::
This cam be only called for highly active pairs,
as many low and middle cap tokens may not see trades
in hours.
:param pairs:
Set of pairs our strategy is trading
:param df:
Dataframe of candles.
May contain candes for a single or multiple pairs.
:param timestamp:
What is the latest timestamp we need to have avilable for every pair.
This is the strategy decision timestamp - current candle time frame.
:raise:
AssertionError
"""
timestamp = pd.Timestamp(timestamp)
assert len(df) > 0, f"Empty dataframe. Pairs: {pairs}"
for p in pairs:
last_timestamp = df.loc[df["pair_id"] == p.pair_id].max()["timestamp"]
assert last_timestamp == timestamp, f"Did not receive wanted latest candle timestamp: {timestamp}. Pair {p} has timestamp {last_timestamp}"
[docs]def wait_for_universe_data_availability_jsonl(
timestamp: datetime.datetime,
client: Client,
current_universe: TradingStrategyUniverse,
required_history_period=datetime.timedelta(days=90),
max_wait=datetime.timedelta(minutes=30),
max_poll_cycles: Optional[int] = None,
poll_delay = datetime.timedelta(seconds=15),
) -> UpdatedUniverseResult:
"""Wait for the data to be available for the latest strategy cycle.
- Used in live execution only
- Uses Trading Strategy oracle real-time JSONL API for the data.
- Uses simple polling appraoch
:param timestamp:
The current strategy decision timestamp.
The latest available data we can have is the previous full candle.
:param current_universe:
The current trading universe with old candles.
:param required_history_period:
How much historical data we need to load.
Depends on the strategy. Defaults to 90 days.
If there is `current_universe.required_history_period` ignore this argument
and use the value from the trading universe instead.
:param max_wait:
Unless data is seen, die with an exception after this period.
:param max_poll_cycles:
Can be set in integration testing.
Return after this many cycles despite new data being incomplete.
:return:
An updated trading universe
"""
assert client is not None
assert timestamp.second == 0
# List of monitored pairs by this strategy
pairs = current_universe.data_universe.pairs
assert len(pairs.pair_map) > 0, "No pairs in the pair_map"
bucket = current_universe.data_universe.time_bucket
completed_pairs: Set[DEXPair] = set()
incompleted_pairs: Set[DEXPair] = {pairs.get_pair_by_id(id) for id in pairs.pair_map.keys()}
started_at = datetime.datetime.utcnow()
deadline = started_at + max_wait
poll_cycle = 1
wanted_timestamp = timestamp - bucket.to_timedelta()
logger.info("Waiting for data availability for pairs %s, strategy cycle timestamp is %s, wanted timestamp is %s",
pairs,
timestamp,
wanted_timestamp
)
if max_poll_cycles is None:
# Make sure we can do int comparison
max_poll_cycles = 99999
# Use the required look back value from the trading
# universe if available.
if current_universe.required_history_period is not None:
required_history_period = current_universe.required_history_period
max_diff = datetime.timedelta(0)
latest_timestamp = diff = None
while datetime.datetime.utcnow() < deadline:
# Get the availability of the trading for candles
avail_map = fetch_availability(
client,
bucket,
incompleted_pairs,
)
last_timestamps_log = {}
# Move any pairs with new complete data to the completed set
pairs_to_move = set()
diff = None
for p in incompleted_pairs:
latest_timestamp = avail_map[p.pair_id]["last_candle_at"]
last_supposed_candle_at = avail_map[p.pair_id]["last_supposed_candle_at"]
if last_supposed_candle_at > latest_timestamp:
latest_timestamp = last_supposed_candle_at
last_timestamps_log[p.get_ticker()] = latest_timestamp
# This pair received its data and is ready
if latest_timestamp >= wanted_timestamp:
pairs_to_move.add(p)
diff = wanted_timestamp - latest_timestamp
max_diff = max(diff, max_diff)
# Some pairs become ready with their data
for p in pairs_to_move:
incompleted_pairs.remove(p)
completed_pairs.add(p)
# Add we done with all incomplete pairs
if not incompleted_pairs or poll_cycle >= max_poll_cycles:
# We have latest data for all pairs and can now update the universe
logger.info("Fetching candle data for the history period of %s", required_history_period)
df = fetch_price_data(
client,
bucket,
wanted_timestamp,
completed_pairs,
required_history_period,
)
if current_universe.has_lending_data():
lending_data = fetch_lending_data(
client,
TimeBucket.h1,
wanted_timestamp,
current_universe.data_universe.lending_reserves,
required_history_period
)
else:
lending_data = None
updated_universe = update_universe(
current_universe,
df,
lending_data
)
time_waited = datetime.datetime.utcnow() - started_at
return UpdatedUniverseResult(
updated_universe=updated_universe,
ready_at=datetime.datetime.utcnow(),
time_waited=time_waited,
poll_cycles=poll_cycle,
max_diff=max_diff,
)
# Avoid excessive logging output if > 10 pairs
last_timestamps_log_str = str(last_timestamps_log)[0:400]
logger.info("Timestamp wanted %s, Completed pairs: %d, Incompleted pairs: %d, last candles %s, diff is %s, sleeping %s",
wanted_timestamp,
len(completed_pairs),
len(incompleted_pairs),
last_timestamps_log_str,
diff,
poll_delay)
time.sleep(poll_delay.total_seconds())
poll_cycle += 1
raise NoNewDataReceived(
f"Waited {max_wait} to get the data to make a trading strategy decision.\n"
f"Decision cycle: {timestamp}.\n"
f"Wanted candle timestamp: {wanted_timestamp}.\n"
f"Latest candle we received: {latest_timestamp}.\n"
f"Diff: {diff}.\n"
f"Wait cycles: {poll_cycle}.\n"
f"Pairs incomplete: {incompleted_pairs}.\n"
f"Pairs complete: {completed_pairs}\n"
)