Aller au contenu

State Management

Three-tier state persistence for strategy data.

StateManager

almanak.framework.state.StateManager

StateManager(
    config: StateManagerConfig | None = None,
    warm_backend: WarmStore | None = None,
)

Tiered state manager with HOT and WARM storage tiers.

Provides: - <1ms access from HOT (in-memory) cache - <10ms access from WARM (PostgreSQL or SQLite) storage - CAS semantics for safe concurrent updates - Automatic tier fallback on load - Metrics tracking for each tier - Write-through from HOT to WARM tier

The WARM tier backend can be either PostgreSQL (production) or SQLite (development/lightweight). Backend selection is via configuration:

Usage

PostgreSQL backend (default, production)

config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), ) manager = StateManager(config) await manager.initialize()

SQLite backend (development)

config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), ) manager = StateManager(config) await manager.initialize()

Save state (writes to HOT then WARM)

state = StateData(strategy_id="strat-1", version=1, state={"key": "value"}) await manager.save_state(state)

Load state (reads from fastest available tier)

loaded = await manager.load_state("strat-1")

CAS update

loaded.state["key"] = "new_value" await manager.save_state(loaded, expected_version=loaded.version)

Dependency injection: provide custom backend

custom_sqlite = SQLiteStore(SQLiteConfig(db_path="./custom.db")) manager = StateManager(config, warm_backend=custom_sqlite)

Initialize StateManager.

参数:

名称 类型 描述 默认
config StateManagerConfig | None

Configuration for the state manager. Uses defaults if not provided.

None
warm_backend WarmStore | None

Optional pre-configured WARM tier backend. If provided, this backend is used instead of creating one from config. Useful for dependency injection and testing.

None

is_initialized property

is_initialized: bool

Check if StateManager is initialized.

enabled_tiers property

enabled_tiers: list[StateTier]

Get list of enabled and initialized tiers.

warm_backend_type property

warm_backend_type: WarmBackendType | None

Get the type of WARM backend being used.

返回:

类型 描述
WarmBackendType | None

WarmBackendType.SQLITE, WarmBackendType.POSTGRESQL, or None if no WARM tier.

warm_backend property

warm_backend: WarmStore | None

Get the WARM tier backend instance.

Useful for accessing backend-specific functionality like get_version_history() on SQLiteStore.

返回:

类型 描述
WarmStore | None

The WARM backend instance, or None if not initialized.

initialize async

initialize() -> None

Initialize all enabled storage tiers.

If load_state_on_startup is enabled in config, loads all active states from WARM tier to HOT tier for fast access.

close async

close() -> None

Close all storage connections.

load_state async

load_state(strategy_id: str) -> StateData

Load state from the fastest available tier.

Tries tiers in order: HOT -> WARM. Populates HOT cache on WARM hit.

参数:

名称 类型 描述 默认
strategy_id str

Strategy identifier

必需

返回:

类型 描述
StateData

StateData from the fastest available tier

引发:

类型 描述
StateNotFoundError

If state not found in any tier

save_state async

save_state(
    state: StateData, expected_version: int | None = None
) -> StateData

Save state to all tiers.

Writes to WARM tier (source of truth) then updates HOT cache.

参数:

名称 类型 描述 默认
state StateData

State data to save

必需
expected_version int | None

Expected version for CAS update. If None and state has version > 1, uses state.version - 1. If None and state has version = 1, creates new state.

None

返回:

类型 描述
StateData

Updated StateData with new version

引发:

类型 描述
StateConflictError

If CAS update fails due to version mismatch

delete_state async

delete_state(strategy_id: str) -> bool

Delete state from all tiers.

参数:

名称 类型 描述 默认
strategy_id str

Strategy identifier

必需

返回:

类型 描述
bool

True if state was deleted from at least one tier

invalidate_hot_cache

invalidate_hot_cache(
    strategy_id: str | None = None,
) -> None

Invalidate HOT tier cache.

参数:

名称 类型 描述 默认
strategy_id str | None

Specific strategy to invalidate, or None to clear all

None

get_metrics

get_metrics(limit: int = 100) -> list[TierMetrics]

Get recent tier metrics.

参数:

名称 类型 描述 默认
limit int

Maximum number of metrics to return

100

返回:

类型 描述
list[TierMetrics]

List of TierMetrics, newest first

get_metrics_summary

get_metrics_summary() -> dict[str, Any]

Get summary of tier metrics.

返回:

类型 描述
dict[str, Any]

Dictionary with per-tier average latencies and success rates

clear_metrics

clear_metrics() -> None

Clear all stored metrics.

save_clob_order async

save_clob_order(order: ClobOrderState) -> bool

Save or update a CLOB order state.

Persists order state to the WARM tier for crash recovery and order tracking across strategy restarts.

参数:

名称 类型 描述 默认
order ClobOrderState

ClobOrderState to persist.

必需

返回:

类型 描述
bool

True if save succeeded, False if no WARM backend or error.

get_clob_order async

get_clob_order(order_id: str) -> ClobOrderState | None

Get a CLOB order by order_id.

参数:

名称 类型 描述 默认
order_id str

Order identifier.

必需

返回:

类型 描述
ClobOrderState | None

ClobOrderState if found, None otherwise.

get_open_clob_orders async

get_open_clob_orders(
    market_id: str | None = None,
) -> list[ClobOrderState]

Get all open CLOB orders, optionally filtered by market.

Open orders are those with status: pending, submitted, live, partially_filled.

参数:

名称 类型 描述 默认
market_id str | None

Optional market ID to filter by.

None

返回:

类型 描述
list[ClobOrderState]

List of open ClobOrderState, newest first.

update_clob_order_status async

update_clob_order_status(
    order_id: str,
    status: ClobOrderStatus,
    fills: list[ClobFill] | None = None,
    filled_size: str | None = None,
    average_fill_price: str | None = None,
    error: str | None = None,
) -> bool

Update the status and fill information of a CLOB order.

参数:

名称 类型 描述 默认
order_id str

Order identifier.

必需
status ClobOrderStatus

New order status.

必需
fills list[ClobFill] | None

Updated list of fills (replaces existing).

None
filled_size str | None

Updated filled size.

None
average_fill_price str | None

Updated average fill price.

None
error str | None

Error message if order failed.

None

返回:

类型 描述
bool

True if order was found and updated.

save_portfolio_snapshot async

save_portfolio_snapshot(snapshot: PortfolioSnapshot) -> int

Save a portfolio snapshot.

Persists portfolio value and position data for dashboard display and PnL tracking.

参数:

名称 类型 描述 默认
snapshot PortfolioSnapshot

PortfolioSnapshot to persist.

必需

返回:

类型 描述
int

Snapshot ID if save succeeded, 0 if no WARM backend or error.

get_latest_snapshot async

get_latest_snapshot(
    strategy_id: str,
) -> PortfolioSnapshot | None

Get most recent portfolio snapshot for a strategy.

参数:

名称 类型 描述 默认
strategy_id str

Strategy identifier.

必需

返回:

类型 描述
PortfolioSnapshot | None

Latest PortfolioSnapshot if found, None otherwise.

get_snapshots_since async

get_snapshots_since(
    strategy_id: str, since: datetime, limit: int = 168
) -> list[PortfolioSnapshot]

Get portfolio snapshots since a timestamp (for charts).

参数:

名称 类型 描述 默认
strategy_id str

Strategy identifier.

必需
since datetime

Start timestamp for query.

必需
limit int

Maximum number of snapshots to return.

168

返回:

类型 描述
list[PortfolioSnapshot]

List of PortfolioSnapshot, oldest first.

get_snapshot_at async

get_snapshot_at(
    strategy_id: str, timestamp: datetime
) -> PortfolioSnapshot | None

Get snapshot closest to a timestamp (for PnL calculation).

参数:

名称 类型 描述 默认
strategy_id str

Strategy identifier.

必需
timestamp datetime

Target timestamp.

必需

返回:

类型 描述
PortfolioSnapshot | None

PortfolioSnapshot closest to timestamp, or None if not found.

save_portfolio_metrics async

save_portfolio_metrics(metrics: PortfolioMetrics) -> bool

Save or update portfolio metrics.

Portfolio metrics store baseline values (initial_value_usd) that survive strategy restarts, enabling accurate PnL calculation.

参数:

名称 类型 描述 默认
metrics PortfolioMetrics

PortfolioMetrics to persist.

必需

返回:

类型 描述
bool

True if save succeeded, False if no WARM backend or error.

get_portfolio_metrics async

get_portfolio_metrics(
    strategy_id: str,
) -> PortfolioMetrics | None

Get portfolio metrics for a strategy.

参数:

名称 类型 描述 默认
strategy_id str

Strategy identifier.

必需

返回:

类型 描述
PortfolioMetrics | None

PortfolioMetrics if found, None otherwise.

cleanup_old_snapshots async

cleanup_old_snapshots(retention_days: int = 7) -> int

Clean up old portfolio snapshots.

参数:

名称 类型 描述 默认
retention_days int

Number of days of snapshots to retain.

7

返回:

类型 描述
int

Number of snapshots deleted.

StateManagerConfig

almanak.framework.state.StateManagerConfig dataclass

StateManagerConfig(
    enable_hot: bool = True,
    enable_warm: bool = True,
    warm_backend: WarmBackendType = WarmBackendType.POSTGRESQL,
    hot_cache_ttl_seconds: int = 0,
    hot_cache_max_size: int = 1000,
    postgres_config: PostgresConfig = PostgresConfig(),
    sqlite_config: SQLiteConfigLight = SQLiteConfigLight(),
    metrics_callback: Callable[[TierMetrics], None]
    | None = None,
    load_state_on_startup: bool = True,
)

Configuration for StateManager.

属性:

名称 类型 描述
enable_hot bool

Enable in-memory cache tier

enable_warm bool

Enable WARM tier (PostgreSQL or SQLite)

warm_backend WarmBackendType

Which backend to use for WARM tier (POSTGRESQL or SQLITE)

hot_cache_ttl_seconds int

TTL for hot cache entries (0 = no expiry)

hot_cache_max_size int

Maximum entries in hot cache

postgres_config PostgresConfig

PostgreSQL configuration (used when warm_backend=POSTGRESQL)

sqlite_config SQLiteConfigLight

SQLite configuration (used when warm_backend=SQLITE)

metrics_callback Callable[[TierMetrics], None] | None

Optional callback for metrics reporting

load_state_on_startup bool

Load all active states from WARM to HOT on startup

Example

PostgreSQL backend (default, production)

config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), )

SQLite backend (local development)

config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), )

StateTier

almanak.framework.state.StateTier

Bases: IntEnum

Storage tier for state data.

Ordered by access speed (fastest first).

StateData

almanak.framework.state.StateData dataclass

StateData(
    strategy_id: str,
    version: int,
    state: dict[str, Any],
    schema_version: int = 1,
    checksum: str = "",
    created_at: datetime = (lambda: datetime.now(UTC))(),
    loaded_from: StateTier | None = None,
)

Strategy state data container.

属性:

名称 类型 描述
strategy_id str

Unique identifier for the strategy

version int

CAS version number (incremented on each update)

state dict[str, Any]

The actual state data as a dictionary

schema_version int

Schema version for migrations

checksum str

SHA-256 hash of state data for integrity verification

created_at datetime

When this state version was created

loaded_from StateTier | None

Which tier the state was loaded from

__post_init__

__post_init__() -> None

Calculate checksum if not provided.

verify_checksum

verify_checksum() -> bool

Verify the integrity of state data.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

from_dict classmethod

from_dict(data: dict[str, Any]) -> StateData

Create StateData from dictionary.

Migrations

StateMigration

almanak.framework.state.StateMigration dataclass

StateMigration(
    version: int,
    migration_fn: MigrationFunction,
    description: str = "",
    rollback_safe_until_version: int = 1,
    created_at: datetime = (lambda: datetime.now(UTC))(),
)

Defines a single state migration.

属性:

名称 类型 描述
version int

The schema version this migration upgrades TO

migration_fn MigrationFunction

Function that transforms state from version-1 to version

description str

Human-readable description of what this migration does

rollback_safe_until_version int

Minimum version that can safely rollback to this version (i.e., versions >= this can rollback without data loss)

created_at datetime

When this migration was defined

__post_init__

__post_init__() -> None

Validate migration.

apply

apply(state: dict[str, Any]) -> dict[str, Any]

Apply this migration to state.

Creates a deep copy to avoid mutating original state.

参数:

名称 类型 描述 默认
state dict[str, Any]

The state dict to migrate

必需

返回:

类型 描述
dict[str, Any]

Migrated state dict (new copy)

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

MigrationRegistry

almanak.framework.state.MigrationRegistry

MigrationRegistry()

Registry of all state migrations.

Tracks migrations and provides version validation and lookup.

current_version property

current_version: int

Get the current (latest) schema version.

migrations property

migrations: dict[int, StateMigration]

Get all registered migrations.

register

register(migration: StateMigration) -> None

Register a migration.

参数:

名称 类型 描述 默认
migration StateMigration

The migration to register

必需

引发:

类型 描述
ValueError

If a migration for this version already exists

get

get(version: int) -> StateMigration | None

Get migration for a specific version.

参数:

名称 类型 描述 默认
version int

Target version

必需

返回:

类型 描述
StateMigration | None

StateMigration or None if not found

get_migrations_path

get_migrations_path(
    from_version: int, to_version: int
) -> list[StateMigration]

Get list of migrations needed to go from one version to another.

参数:

名称 类型 描述 默认
from_version int

Starting version

必需
to_version int

Target version

必需

返回:

类型 描述
list[StateMigration]

List of migrations to apply (in order)

引发:

类型 描述
MigrationNotFoundError

If any required migration is missing

get_rollback_info

get_rollback_info(current_version: int) -> RollbackInfo

Get rollback safety information for a version.

参数:

名称 类型 描述 默认
current_version int

Current schema version

必需

返回:

类型 描述
RollbackInfo

RollbackInfo with safe and unsafe rollback targets

clear

clear() -> None

Clear all registered migrations (mainly for testing).

MigrationResult

almanak.framework.state.MigrationResult dataclass

MigrationResult(
    success: bool,
    from_version: int,
    to_version: int,
    migrations_applied: list[int],
    state: dict[str, Any],
    error: str | None = None,
    duration_ms: float = 0.0,
)

Result of applying migrations.

属性:

名称 类型 描述
success bool

Whether all migrations succeeded

from_version int

Starting schema version

to_version int

Ending schema version

migrations_applied list[int]

List of migration versions applied

state dict[str, Any]

The migrated state (or original if failed)

error str | None

Error message if migration failed

duration_ms float

Total migration time in milliseconds

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

Position Management

PositionManager

almanak.framework.state.PositionManager

PositionManager(
    chains: list[str],
    initial_positions: list[PositionRecord] | None = None,
)

Manages positions across multiple chains with chain dimension support.

Provides methods to store, query, and aggregate positions across chains. Designed to work with StateData.state dictionary for persistence.

Usage

Create manager for a multi-chain strategy

manager = PositionManager(chains=['arbitrum', 'optimism', 'base'])

Add positions

manager.add_position(PositionRecord( position_id='pos-1', chain='arbitrum', position_type=PositionType.SUPPLY, protocol='aave_v3', token='WETH', amount=Decimal('1.5'), value_usd=Decimal('3000'), ))

Query positions

all_positions = manager.positions # All chains arb_positions = manager.positions_on('arbitrum') # Single chain total_usd = manager.total_value_usd # Aggregate value

属性:

名称 类型 描述
chains list[str]

List of configured chain names

Initialize position manager.

参数:

名称 类型 描述 默认
chains list[str]

List of chain names this manager handles

必需
initial_positions list[PositionRecord] | None

Optional list of positions to pre-populate

None

chains property

chains: list[str]

Get list of configured chains.

positions property

positions: list[PositionRecord]

Get all positions across all chains.

返回:

类型 描述
list[PositionRecord]

List of all positions from all configured chains.

total_value_usd property

total_value_usd: Decimal

Calculate total USD value across all chains.

返回:

类型 描述
Decimal

Sum of value_usd for all positions across all chains.

positions_on

positions_on(chain: str) -> list[PositionRecord]

Get positions on a specific chain.

参数:

名称 类型 描述 默认
chain str

Chain name to filter by

必需

返回:

类型 描述
list[PositionRecord]

List of positions on the specified chain

引发:

类型 描述
ChainNotFoundError

If chain is not configured

total_value_on

total_value_on(chain: str) -> Decimal

Calculate total USD value on a specific chain.

参数:

名称 类型 描述 默认
chain str

Chain name to calculate value for

必需

返回:

类型 描述
Decimal

Sum of value_usd for positions on the specified chain

引发:

类型 描述
ChainNotFoundError

If chain is not configured

add_position

add_position(position: PositionRecord) -> None

Add or update a position.

If a position with the same position_id exists on the same chain, it will be replaced.

参数:

名称 类型 描述 默认
position PositionRecord

Position to add/update

必需

引发:

类型 描述
ChainNotFoundError

If position's chain is not configured

remove_position

remove_position(position_id: str, chain: str) -> bool

Remove a position by ID and chain.

参数:

名称 类型 描述 默认
position_id str

Position identifier

必需
chain str

Chain the position is on

必需

返回:

类型 描述
bool

True if position was removed, False if not found

引发:

类型 描述
ChainNotFoundError

If chain is not configured

get_position

get_position(
    position_id: str, chain: str
) -> PositionRecord | None

Get a specific position by ID and chain.

参数:

名称 类型 描述 默认
position_id str

Position identifier

必需
chain str

Chain the position is on

必需

返回:

类型 描述
PositionRecord | None

PositionRecord if found, None otherwise

引发:

类型 描述
ChainNotFoundError

If chain is not configured

find_position

find_position(position_id: str) -> PositionRecord | None

Find a position by ID across all chains.

参数:

名称 类型 描述 默认
position_id str

Position identifier to search for

必需

返回:

类型 描述
PositionRecord | None

PositionRecord if found, None otherwise

positions_by_type

positions_by_type(
    position_type: PositionType,
) -> list[PositionRecord]

Get all positions of a specific type across all chains.

参数:

名称 类型 描述 默认
position_type PositionType

Type of position to filter by

必需

返回:

类型 描述
list[PositionRecord]

List of positions matching the type

positions_by_protocol

positions_by_protocol(
    protocol: str,
) -> list[PositionRecord]

Get all positions for a specific protocol across all chains.

参数:

名称 类型 描述 默认
protocol str

Protocol name to filter by

必需

返回:

类型 描述
list[PositionRecord]

List of positions on the protocol

clear

clear(chain: str | None = None) -> None

Clear positions.

参数:

名称 类型 描述 默认
chain str | None

If provided, clear only positions on this chain. If None, clear all positions on all chains.

None

引发:

类型 描述
ChainNotFoundError

If specified chain is not configured

to_dict

to_dict() -> dict[str, Any]

Serialize all positions to dictionary for state storage.

返回:

类型 描述
dict[str, Any]

Dictionary with chain -> position_id -> position data structure

from_dict classmethod

from_dict(data: dict[str, Any]) -> PositionManager

Deserialize position manager from dictionary.

参数:

名称 类型 描述 默认
data dict[str, Any]

Dictionary from to_dict()

必需

返回:

类型 描述
PositionManager

PositionManager with restored positions

get_summary

get_summary() -> dict[str, Any]

Get a summary of positions across all chains.

返回:

类型 描述
dict[str, Any]

Dictionary with per-chain and total statistics

__repr__

__repr__() -> str

String representation of manager.

PositionRecord

almanak.framework.state.PositionRecord dataclass

PositionRecord(
    position_id: str,
    chain: str,
    position_type: PositionType,
    protocol: str | None,
    token: str,
    amount: Decimal,
    value_usd: Decimal,
    created_at: datetime = (lambda: datetime.now(UTC))(),
    updated_at: datetime = (lambda: datetime.now(UTC))(),
    metadata: dict[str, Any] = dict(),
)

A single position record with chain dimension.

This is the fundamental unit of position tracking. Every position must have an explicit chain field to support multi-chain strategies.

属性:

名称 类型 描述
position_id str

Unique identifier for this position

chain str

The blockchain this position is on (e.g., 'arbitrum', 'optimism')

position_type PositionType

Type of position (TOKEN, LP, BORROW, SUPPLY, PERP)

protocol str | None

Protocol where position is held (e.g., 'aave_v3', 'uniswap_v3')

token str

Primary token symbol (or pool identifier for LP)

amount Decimal

Amount of tokens or liquidity

value_usd Decimal

Current USD value of the position

created_at datetime

When the position was opened

updated_at datetime

Last update timestamp

metadata dict[str, Any]

Additional position-specific data (health factor, ranges, etc.)

to_dict

to_dict() -> dict[str, Any]

Serialize position to dictionary.

from_dict classmethod

from_dict(data: dict[str, Any]) -> PositionRecord

Deserialize position from dictionary.

Exceptions

almanak.framework.state.StateConflictError

StateConflictError(
    strategy_id: str,
    expected_version: int,
    actual_version: int,
    message: str | None = None,
)

Bases: Exception

Raised when CAS update fails due to version mismatch.

This error indicates that another process has modified the state since it was last read. The caller should reload the state and retry.

almanak.framework.state.StateNotFoundError

StateNotFoundError(
    strategy_id: str, message: str | None = None
)

Bases: Exception

Raised when state is not found in any tier.