Source code for tradeexecutor.utils.blockchain

"""Blockchain related utilities."""
import datetime

from web3 import Web3
from web3.types import BlockIdentifier

from eth_defi.event_reader.conversion import convert_jsonrpc_value_to_int


[docs]def get_latest_block_timestamp(web3: Web3) -> datetime.datetime: """Get the latest block timestamp. .. warning:: Do not use See :py:func:`eth_defi.provider.broken_provider.get_almost_latest_block_number` :return: Timezone naive UTC datetime """ last_block = web3.eth.get_block("latest") ts_str = last_block["timestamp"] # Depending on middleware, response might be converted or not if type(ts_str) == str: ts = int(ts_str, 16) else: ts = ts_str return datetime.datetime.utcfromtimestamp(ts)
[docs]def get_block_timestamp(web3: Web3, block_identifier: BlockIdentifier) -> datetime.datetime: """Get a block timestamp. Slow method. Use only for individual queries. :return: Timezone naive UTC datetime """ last_block = web3.eth.get_block(block_identifier) ts_str = last_block["timestamp"] # Depending on middleware, response might be converted or not if type(ts_str) == str: ts = convert_jsonrpc_value_to_int(ts_str) else: ts = ts_str return datetime.datetime.utcfromtimestamp(ts)