Source code for tradingstrategy.utils.schema

"""Schema manipulation utitilies.

Constructing PyArrow schemas.

.. note ::

    Legacy. This module is likely to be removed.

For more information about Pyarrow Schemas, see https://arrow.apache.org/docs/python/api/datatypes.html
"""

import typing
from enum import Enum
from types import NoneType
from typing import Optional, Dict, Callable, List
from dataclasses import fields, Field

import pyarrow as pa

from tradingstrategy.chain import ChainId
from tradingstrategy.types import PrimaryKey, NonChecksummedAddress, BlockNumber, UNIXTimestamp, BasisPoint


class CannotMap(Exception):
    pass


[docs]def unmappable(t): raise CannotMap(f"Cannot automatically map {t}")
#: Default mappings for automatic schema generation, #: Including our own type definitions, DEFAULT_MAPPINGS = { PrimaryKey: lambda t: pa.uint32(), ChainId: lambda t: pa.uint16(), NonChecksummedAddress: lambda t: pa.string(), BlockNumber: lambda t: pa.uint32(), UNIXTimestamp: lambda t: pa.timestamp("s"), BasisPoint: lambda t: pa.uint32(), bool: lambda t: pa.bool_(), float: lambda t: pa.float32(), int: lambda t: pa.uint32(), str: lambda t: pa.string(), dict: lambda t: unmappable(t), list: lambda t: unmappable(t), }
[docs]def map_field_to_arrow(field: Field, hints: Dict[str, pa.DataType], core_mappings: Dict[str, Callable]) -> pa.DataType: """Map a dataclass field to a pyarrow equivalent, respect hints""" hinted = hints.get(field.name) if hinted: return hinted # Resolve optional origin = typing.get_origin(field.type) if origin == typing.Union: # Optional type args = typing.get_args(field.type) assert len(args) == 2 assert args[1] == type(None) field_type = args[0] true_origin = typing.get_origin(args[0]) else: field_type = field.type true_origin = origin if true_origin == list: args = typing.get_args(field_type) value_type = args[0] mapped_value_type = core_mappings[value_type](field) return pa.list_(mapped_value_type) elif true_origin == dict: # Only string string dicts supported return pa.map_(pa.string(), pa.string()) else: if issubclass(field_type, Enum): # No support for category compaction yet field_type = str return core_mappings[field_type](field)
[docs]def create_pyarrow_schema_for_dataclass( cls, hints: Optional[typing.Dict[str, pa.DataType]] = None, core_mappings=DEFAULT_MAPPINGS) -> pa.Schema: """Map a Python dataclass to Pyarrow schema. Most fields map automatically, but you can also provide per field name hints what types they should use. """ if not hints: hints = {} pa_fields = [(field.name, map_field_to_arrow(field, hints, core_mappings)) for field in fields(cls) if hints.get(field.name) is not NoneType] return pa.schema(pa_fields)
[docs]def create_columnar_work_buffer(cls) -> Dict[str, list]: """Create a columnar work buffer to export data into Pyarrow Tables.""" return {field.name: [] for field in fields(cls)}
[docs]def append_to_columnar_work_buffer(buffer: Dict[str, list], item): """Convert tabular data items to columnar. Automatically handle the special case of enum. """ def process_value(key: str): try: v = getattr(item, key) if isinstance(v, Enum): v = v.value else: v = v buffer[key].append(v) except (AttributeError, ValueError) as e: raise RuntimeError(f"Could not serialised {key} for {item}") from e for key in buffer: process_value(key)