Aller au contenu

Alerting

Alert management with Slack and Telegram channels, cooldown tracking, and escalation policies.

AlertManager

almanak.framework.alerting.AlertManager

AlertManager(
    config: AlertConfig,
    telegram_bot_token: str | None = None,
    slack_webhook_url: str | None = None,
    slack_enable_threading: bool = True,
)

Manages alert routing and delivery to configured channels.

The AlertManager is responsible for: - Evaluating alert rules against incoming events - Routing alerts to the appropriate channels (Telegram, Slack, etc.) - Applying cooldown to prevent spam - Respecting quiet hours - Logging all sent alerts

属性:

名称 类型 描述
config

The AlertConfig for this manager

telegram_channel TelegramChannel | None

Optional TelegramChannel instance

slack_channel SlackChannel | None

Optional SlackChannel instance

cooldown_tracker

Tracks cooldown state

Initialize the AlertManager.

参数:

名称 类型 描述 默认
config AlertConfig

The AlertConfig with channel configurations and rules

必需
telegram_bot_token str | None

Bot token for Telegram (required if using Telegram)

None
slack_webhook_url str | None

Webhook URL for Slack (overrides config.slack_webhook)

None
slack_enable_threading bool

Whether to enable threading for Slack alerts

True

telegram_channel property

telegram_channel: TelegramChannel | None

Get the Telegram channel if configured.

slack_channel property

slack_channel: SlackChannel | None

Get the Slack channel if configured.

send_alert async

send_alert(
    card: OperatorCard,
    metric_values: dict[AlertCondition, Decimal]
    | None = None,
) -> AlertSendResult

Send an alert for the given OperatorCard.

This method: 1. Finds matching alert rules based on the card's event type 2. Checks if alerts should be sent (quiet hours, cooldown) 3. Routes to configured channels 4. Records cooldown state 5. Logs all sent alerts

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard to alert on

必需
metric_values dict[AlertCondition, Decimal] | None

Optional dict of metric values for threshold-based rules

None

返回:

类型 描述
AlertSendResult

AlertSendResult with status and any errors

send_alert_sync

send_alert_sync(
    card: OperatorCard,
    metric_values: dict[AlertCondition, Decimal]
    | None = None,
) -> AlertSendResult

Synchronous wrapper for send_alert.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard to alert on

必需
metric_values dict[AlertCondition, Decimal] | None

Optional dict of metric values for threshold-based rules

None

返回:

类型 描述
AlertSendResult

AlertSendResult with status and any errors

send_direct_telegram_alert async

send_direct_telegram_alert(
    card: OperatorCard,
) -> AlertSendResult

Send an alert directly to Telegram, bypassing rule matching.

This is useful for critical system alerts that should always go through regardless of configured rules.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard to alert on

必需

返回:

类型 描述
AlertSendResult

AlertSendResult with status

send_direct_telegram_alert_sync

send_direct_telegram_alert_sync(
    card: OperatorCard,
) -> AlertSendResult

Synchronous wrapper for send_direct_telegram_alert.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard to alert on

必需

返回:

类型 描述
AlertSendResult

AlertSendResult with status

send_direct_slack_alert async

send_direct_slack_alert(
    card: OperatorCard, thread_ts: str | None = None
) -> AlertSendResult

Send an alert directly to Slack, bypassing rule matching.

This is useful for critical system alerts that should always go through regardless of configured rules. Supports threading for related alerts.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard to alert on

必需
thread_ts str | None

Optional thread timestamp to reply to

None

返回:

类型 描述
AlertSendResult

AlertSendResult with status

send_direct_slack_alert_sync

send_direct_slack_alert_sync(
    card: OperatorCard, thread_ts: str | None = None
) -> AlertSendResult

Synchronous wrapper for send_direct_slack_alert.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard to alert on

必需
thread_ts str | None

Optional thread timestamp to reply to

None

返回:

类型 描述
AlertSendResult

AlertSendResult with status

set_slack_thread

set_slack_thread(strategy_id: str, thread_ts: str) -> None

Set the Slack thread timestamp for a strategy.

This enables subsequent alerts for this strategy to be posted as thread replies.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需
thread_ts str

The thread timestamp from Slack

必需

clear_slack_thread

clear_slack_thread(strategy_id: str) -> None

Clear the Slack thread context for a strategy.

Call this when a strategy issue is resolved to start fresh threads for future alerts.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需

clear_cooldown

clear_cooldown(
    strategy_id: str,
    condition: AlertCondition | None = None,
) -> None

Clear cooldown state for a strategy.

参数:

名称 类型 描述 默认
strategy_id str

The strategy to clear cooldowns for

必需
condition AlertCondition | None

Optional specific condition to clear (clears all if None)

None

GatewayAlertManager

almanak.framework.alerting.GatewayAlertManager

GatewayAlertManager(
    client: GatewayClient,
    strategy_id: str = "",
    timeout: float = 30.0,
)

AlertManager that sends alerts through the gateway.

This implementation routes all alert requests to the gateway sidecar, which has access to the actual alerting channels (Slack, Telegram).

Example

from almanak.framework.gateway_client import GatewayClient from almanak.framework.alerting.gateway_alert_manager import GatewayAlertManager

with GatewayClient() as client: alert_manager = GatewayAlertManager(client, strategy_id="my-strategy") result = await alert_manager.send_alert( message="Strategy executed successfully", severity="info", ) print(f"Alert sent: {result.success}")

Initialize gateway-backed alert manager.

参数:

名称 类型 描述 默认
client GatewayClient

Connected GatewayClient instance

必需
strategy_id str

Strategy identifier for alert context

''
timeout float

RPC timeout in seconds

30.0

strategy_id property

strategy_id: str

Get the strategy ID.

send_alert async

send_alert(
    message: str,
    severity: str = "info",
    channel: str = "slack",
    metadata: dict[str, str] | None = None,
) -> GatewayAlertResult

Send an alert through the gateway.

参数:

名称 类型 描述 默认
message str

Alert message text

必需
severity str

Alert severity ("info", "warning", "critical")

'info'
channel str

Alert channel ("slack", "telegram")

'slack'
metadata dict[str, str] | None

Additional metadata to include

None

返回:

类型 描述
GatewayAlertResult

GatewayAlertResult with success status

log async

log(
    message: str,
    level: str = "INFO",
    context: dict[str, str] | None = None,
    logger_name: str = "",
) -> None

Send a log message through the gateway.

参数:

名称 类型 描述 默认
message str

Log message text

必需
level str

Log level ("DEBUG", "INFO", "WARNING", "ERROR")

'INFO'
context dict[str, str] | None

Additional context to include

None
logger_name str

Optional logger name for categorization

''

record_metric async

record_metric(
    name: str,
    value: float,
    labels: dict[str, str] | None = None,
    metric_type: str = "gauge",
) -> None

Record a metric through the gateway.

参数:

名称 类型 描述 默认
name str

Metric name

必需
value float

Metric value

必需
labels dict[str, str] | None

Metric labels/tags

None
metric_type str

Type of metric ("gauge", "counter", "histogram")

'gauge'

Channels

SlackChannel

almanak.framework.alerting.SlackChannel

SlackChannel(
    webhook_url: str,
    dashboard_base_url: str | None = None,
    max_retries: int = 3,
    base_delay: float = 1.0,
    enable_threading: bool = True,
    thread_timeout_seconds: int = 3600,
)

Slack notification channel for sending alerts via webhooks.

This class implements Slack incoming webhooks for sending alert notifications to operators. It uses Slack Block Kit for rich formatting and handles rate limiting with exponential backoff.

Supports threading for related alerts - subsequent alerts for the same strategy will be posted as thread replies to the original alert.

属性:

名称 类型 描述
webhook_url

The Slack incoming webhook URL

dashboard_base_url

Base URL for dashboard links in messages

max_retries

Maximum number of retries for failed sends

base_delay

Base delay in seconds for exponential backoff

Initialize the Slack channel.

参数:

名称 类型 描述 默认
webhook_url str

The Slack incoming webhook URL

必需
dashboard_base_url str | None

Base URL for dashboard links in messages

None
max_retries int

Maximum number of retries for failed sends

3
base_delay float

Base delay in seconds for exponential backoff

1.0
enable_threading bool

Whether to enable threading for related alerts

True
thread_timeout_seconds int

How long to keep thread context (default 1 hour)

3600

clear_thread

clear_thread(strategy_id: str) -> None

Clear the thread context for a strategy.

Call this when a strategy issue is resolved to start fresh threads for future alerts.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需

clear_all_threads

clear_all_threads() -> None

Clear all thread contexts.

send_alert async

send_alert(
    card: OperatorCard, thread_ts: str | None = None
) -> SlackSendResult

Send an alert to Slack with exponential backoff retry.

This method formats the OperatorCard using Slack Block Kit and sends it to the configured webhook. It handles rate limiting with exponential backoff and logs all send attempts.

Threading support: If enable_threading is True and a thread_ts is provided (or stored from a previous alert for this strategy), the alert will be sent as a thread reply. Note that incoming webhooks don't return message timestamps, so for full threading support consider using the Slack Web API.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard containing alert information

必需
thread_ts str | None

Optional thread timestamp to reply to

None

返回:

类型 描述
SlackSendResult

SlackSendResult indicating success or failure, with thread_ts if available

send_alert_sync

send_alert_sync(
    card: OperatorCard, thread_ts: str | None = None
) -> SlackSendResult

Synchronous wrapper for send_alert.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard containing alert information

必需
thread_ts str | None

Optional thread timestamp to reply to

None

返回:

类型 描述
SlackSendResult

SlackSendResult indicating success or failure

set_thread_for_strategy

set_thread_for_strategy(
    strategy_id: str, thread_ts: str
) -> None

Set the thread_ts for a strategy externally.

This allows integration with the Slack Web API which returns message timestamps. After sending a message via Web API, call this method to enable subsequent alerts to be threaded.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需
thread_ts str

The thread timestamp from Slack Web API

必需

send_custom_message async

send_custom_message(
    strategy_id: str,
    severity: Severity,
    title: str,
    message: str,
    context: dict[str, Any] | None = None,
    thread_ts: str | None = None,
) -> SlackSendResult

Send a custom formatted message.

This method allows sending custom formatted messages that don't come from an OperatorCard. Supports threading for related messages.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需
severity Severity

Alert severity level

必需
title str

Alert title

必需
message str

Alert message body

必需
context dict[str, Any] | None

Optional additional context

None
thread_ts str | None

Optional thread timestamp to reply to

None

返回:

类型 描述
SlackSendResult

SlackSendResult indicating success or failure

TelegramChannel

almanak.framework.alerting.TelegramChannel

TelegramChannel(
    chat_id: str,
    bot_token: str,
    dashboard_base_url: str | None = None,
    max_retries: int = 3,
    base_delay: float = 1.0,
)

Telegram notification channel for sending alerts.

This class implements the Telegram Bot API for sending alert notifications to operators. It handles rate limiting with exponential backoff and formats messages with severity indicators.

属性:

名称 类型 描述
chat_id

The Telegram chat ID to send messages to

bot_token

The Telegram bot API token

dashboard_base_url

Base URL for dashboard links in messages

max_retries

Maximum number of retries for failed sends

base_delay

Base delay in seconds for exponential backoff

Initialize the Telegram channel.

参数:

名称 类型 描述 默认
chat_id str

The Telegram chat ID to send messages to

必需
bot_token str

The Telegram bot API token

必需
dashboard_base_url str | None

Base URL for dashboard links in messages

None
max_retries int

Maximum number of retries for failed sends

3
base_delay float

Base delay in seconds for exponential backoff

1.0

api_url property

api_url: str

Get the Telegram API URL for this bot.

send_alert async

send_alert(card: OperatorCard) -> TelegramSendResult

Send an alert to Telegram with exponential backoff retry.

This method formats the OperatorCard as a Telegram message and sends it to the configured chat. It handles rate limiting with exponential backoff and logs all send attempts.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard containing alert information

必需

返回:

类型 描述
TelegramSendResult

TelegramSendResult indicating success or failure

send_alert_sync

send_alert_sync(card: OperatorCard) -> TelegramSendResult

Synchronous wrapper for send_alert.

参数:

名称 类型 描述 默认
card OperatorCard

The OperatorCard containing alert information

必需

返回:

类型 描述
TelegramSendResult

TelegramSendResult indicating success or failure

format_custom_message

format_custom_message(
    strategy_id: str,
    severity: Severity,
    title: str,
    message: str,
    context: dict[str, Any] | None = None,
) -> str

Format a custom alert message.

This method allows sending custom formatted messages that don't come from an OperatorCard.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需
severity Severity

Alert severity level

必需
title str

Alert title

必需
message str

Alert message body

必需
context dict[str, Any] | None

Optional additional context

None

返回:

类型 描述
str

Formatted message string

send_custom_message async

send_custom_message(
    strategy_id: str,
    severity: Severity,
    title: str,
    message: str,
    context: dict[str, Any] | None = None,
) -> TelegramSendResult

Send a custom formatted message.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需
severity Severity

Alert severity level

必需
title str

Alert title

必需
message str

Alert message body

必需
context dict[str, Any] | None

Optional additional context

None

返回:

类型 描述
TelegramSendResult

TelegramSendResult indicating success or failure

Configuration

AlertConfig

almanak.framework.alerting.AlertConfig dataclass

AlertConfig(
    telegram_chat_id: str | None = None,
    slack_webhook: str | None = None,
    email: str | None = None,
    pagerduty_key: str | None = None,
    rules: list[AlertRule] = list(),
    quiet_hours: TimeRange | None = None,
    escalation_timeout_seconds: int = 900,
    dashboard_base_url: str | None = None,
    enabled: bool = True,
)

Configuration for a strategy's alerting setup.

This dataclass holds all the configuration needed to send alerts to operators via multiple channels.

属性:

名称 类型 描述
telegram_chat_id str | None

Telegram chat ID for notifications

slack_webhook str | None

Slack webhook URL for notifications

email str | None

Email address for notifications

pagerduty_key str | None

PagerDuty integration key for critical alerts

rules list[AlertRule]

List of alert rules to evaluate

quiet_hours TimeRange | None

Optional time range during which only CRITICAL alerts are sent

escalation_timeout_seconds int

Time before escalating unacknowledged alerts

dashboard_base_url str | None

Base URL for dashboard links in alerts

enabled bool

Global enable/disable for all alerting

configured_channels property

configured_channels: list[AlertChannel]

Get the list of channels that have been configured.

has_channel

has_channel(channel: AlertChannel) -> bool

Check if a specific channel is configured.

get_rules_for_condition

get_rules_for_condition(
    condition: AlertCondition,
) -> list[AlertRule]

Get all enabled rules for a specific condition.

get_rules_for_channel

get_rules_for_channel(
    channel: AlertChannel,
) -> list[AlertRule]

Get all enabled rules that include a specific channel.

is_in_quiet_hours

is_in_quiet_hours(check_time: time) -> bool

Check if the given time is within quiet hours.

should_send_alert

should_send_alert(
    severity: Severity, current_time: time
) -> bool

Determine if an alert should be sent based on severity and quiet hours.

During quiet hours, only CRITICAL alerts are sent.

参数:

名称 类型 描述 默认
severity Severity

The severity of the alert

必需
current_time time

The current time to check against quiet hours

必需

返回:

类型 描述
bool

True if the alert should be sent

to_dict

to_dict() -> dict[str, Any]

Convert the alert config to a dictionary for serialization.

AlertRule

almanak.framework.alerting.AlertRule dataclass

AlertRule(
    condition: AlertCondition,
    threshold: Decimal,
    severity: Severity,
    channels: list[AlertChannel],
    cooldown_seconds: int = 300,
    enabled: bool = True,
    description: str = "",
    custom_message: str | None = None,
)

A rule defining when and how to send an alert.

属性:

名称 类型 描述
condition AlertCondition

The condition that triggers this alert

threshold Decimal

The threshold value for the condition (interpretation depends on condition)

severity Severity

Severity level for alerts triggered by this rule

channels list[AlertChannel]

List of channels to send alerts to

cooldown_seconds int

Minimum seconds between alerts for this rule

enabled bool

Whether this rule is active

description str

Human-readable description of the rule

custom_message str | None

Optional custom message template for the alert

__post_init__

__post_init__() -> None

Validate the alert rule.

to_dict

to_dict() -> dict[str, Any]

Convert the alert rule to a dictionary for serialization.

AlertChannel

almanak.framework.alerting.AlertChannel

Bases: StrEnum

Supported notification channels for alerts.

Escalation

EscalationPolicy

almanak.framework.alerting.EscalationPolicy

EscalationPolicy(
    config: AlertConfig,
    auto_remediation_callback: AutoRemediationCallback
    | None = None,
    emergency_pause_callback: EmergencyPauseCallback
    | None = None,
    custom_thresholds: dict[EscalationLevel, int]
    | None = None,
)

Manages escalation of unacknowledged alerts.

The EscalationPolicy tracks alerts and escalates them through multiple levels if they are not acknowledged within time thresholds.

Escalation levels: - Level 1 (<5 min): Telegram/Slack - Level 2 (<15 min): Add Email - Level 3 (<30 min): PagerDuty for HIGH+ severity - Level 4 (30+ min): Auto-remediation or emergency pause

属性:

名称 类型 描述
config

The AlertConfig for channel configuration

escalations dict[str, EscalationState]

Dict of active escalation states by alert_id

auto_remediation_callback

Optional callback for auto-remediation

emergency_pause_callback

Optional callback for emergency pause

Initialize the EscalationPolicy.

参数:

名称 类型 描述 默认
config AlertConfig

AlertConfig with channel configurations

必需
auto_remediation_callback AutoRemediationCallback | None

Callback to execute auto-remediation

None
emergency_pause_callback EmergencyPauseCallback | None

Callback to execute emergency pause

None
custom_thresholds dict[EscalationLevel, int] | None

Optional custom time thresholds for escalation levels

None

start_escalation

start_escalation(
    strategy_id: str,
    card: OperatorCard,
    current_time: datetime | None = None,
) -> EscalationState

Start tracking escalation for a new alert.

If an escalation already exists for this alert, returns the existing one.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需
card OperatorCard

The OperatorCard that triggered the alert

必需
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
EscalationState

The EscalationState for this alert

acknowledge

acknowledge(
    alert_id: str,
    acknowledged_by: str = "operator",
    current_time: datetime | None = None,
) -> bool

Acknowledge an alert and stop its escalation.

参数:

名称 类型 描述 默认
alert_id str

The alert ID to acknowledge

必需
acknowledged_by str

Who is acknowledging (for audit)

'operator'
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
bool

True if acknowledgment succeeded, False if alert not found

acknowledge_by_strategy

acknowledge_by_strategy(
    strategy_id: str,
    acknowledged_by: str = "operator",
    current_time: datetime | None = None,
) -> int

Acknowledge all active alerts for a strategy.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需
acknowledged_by str

Who is acknowledging

'operator'
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
int

Number of alerts acknowledged

resolve

resolve(
    alert_id: str, current_time: datetime | None = None
) -> bool

Mark an alert as resolved.

参数:

名称 类型 描述 默认
alert_id str

The alert ID to resolve

必需
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
bool

True if resolution succeeded, False if alert not found

check_escalation

check_escalation(
    alert_id: str, current_time: datetime | None = None
) -> EscalationResult

Check if an alert needs to be escalated.

This method checks the time elapsed since the alert was created and determines if it should be escalated to the next level.

参数:

名称 类型 描述 默认
alert_id str

The alert ID to check

必需
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
EscalationResult

EscalationResult indicating what action to take

process_escalation async

process_escalation(
    alert_id: str, current_time: datetime | None = None
) -> EscalationResult

Process escalation for an alert, including executing Level 4 actions.

This method checks escalation and executes auto-remediation or emergency pause if Level 4 is reached.

参数:

名称 类型 描述 默认
alert_id str

The alert ID to process

必需
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
EscalationResult

EscalationResult with action details

process_escalation_sync

process_escalation_sync(
    alert_id: str, current_time: datetime | None = None
) -> EscalationResult

Synchronous wrapper for process_escalation.

参数:

名称 类型 描述 默认
alert_id str

The alert ID to process

必需
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
EscalationResult

EscalationResult with action details

check_all_escalations

check_all_escalations(
    current_time: datetime | None = None,
) -> dict[str, EscalationResult]

Check all active escalations.

参数:

名称 类型 描述 默认
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
dict[str, EscalationResult]

Dict mapping alert_id to EscalationResult

process_all_escalations async

process_all_escalations(
    current_time: datetime | None = None,
) -> dict[str, EscalationResult]

Process all active escalations.

参数:

名称 类型 描述 默认
current_time datetime | None

Current time (defaults to now)

None

返回:

类型 描述
dict[str, EscalationResult]

Dict mapping alert_id to EscalationResult

get_escalation_state

get_escalation_state(
    alert_id: str,
) -> EscalationState | None

Get the current escalation state for an alert.

参数:

名称 类型 描述 默认
alert_id str

The alert ID

必需

返回:

类型 描述
EscalationState | None

EscalationState or None if not found

get_active_escalations

get_active_escalations() -> list[EscalationState]

Get all active escalations.

返回:

类型 描述
list[EscalationState]

List of active EscalationState objects

get_escalations_for_strategy

get_escalations_for_strategy(
    strategy_id: str,
) -> list[EscalationState]

Get all escalations for a strategy.

参数:

名称 类型 描述 默认
strategy_id str

The strategy ID

必需

返回:

类型 描述
list[EscalationState]

List of EscalationState objects for the strategy

clear_resolved_escalations

clear_resolved_escalations(
    max_age_seconds: int = 86400,
) -> int

Clear old resolved escalations to prevent memory buildup.

参数:

名称 类型 描述 默认
max_age_seconds int

Maximum age for resolved escalations (default 24 hours)

86400

返回:

类型 描述
int

Number of escalations cleared

EscalationLevel

almanak.framework.alerting.EscalationLevel

Bases: IntEnum

Escalation levels from least to most severe.

Results

AlertSendResult

almanak.framework.alerting.AlertSendResult dataclass

AlertSendResult(
    success: bool,
    channels_sent: list[AlertChannel] = list(),
    channels_failed: list[AlertChannel] = list(),
    errors: dict[AlertChannel, str] = dict(),
    skipped_reason: str | None = None,
)

Result of sending an alert through AlertManager.