Source code for tradeexecutor.utils.blockchain
"""Blockchain related utilities."""
import datetime
from web3 import Web3
from web3.types import BlockIdentifier
from eth_defi.event_reader.conversion import convert_jsonrpc_value_to_int
[docs]def get_latest_block_timestamp(web3: Web3) -> datetime.datetime:
    """Get the latest block timestamp.
    .. warning::
        Do not use
        See :py:func:`eth_defi.provider.broken_provider.get_almost_latest_block_number`
    :return:
        Timezone naive UTC datetime
    """
    last_block = web3.eth.get_block("latest")
    ts_str = last_block["timestamp"]
    # Depending on middleware, response might be converted or not
    if type(ts_str) == str:
        ts = int(ts_str, 16)
    else:
        ts = ts_str
    return datetime.datetime.utcfromtimestamp(ts)
[docs]def get_block_timestamp(web3: Web3, block_identifier: BlockIdentifier) -> datetime.datetime:
    """Get a  block timestamp.
    Slow method. Use only for individual queries.
    :return:
        Timezone naive UTC datetime
    """
    last_block = web3.eth.get_block(block_identifier)
    ts_str = last_block["timestamp"]
    # Depending on middleware, response might be converted or not
    if type(ts_str) == str:
        ts = convert_jsonrpc_value_to_int(ts_str)
    else:
        ts = ts_str
    return datetime.datetime.utcfromtimestamp(ts)