Skip to main content

Dashboard

Visualization is key in any data-related project. Almanak Strategy Framework got you covered. The Platform can't unfortunately offer a one-size-fits-all dashboard natively, given that each strategy is customizable to the fullest. This flexibility comes with a tradeoff.

With great power comes great responsibility.

The simplest way to move forward was to offer Dashboarding capabilities via Streamlit. Given that the strategists will be developing their strategy in Python it feels natural to have an easy Pythonic way to create beautiful dashboards.

Before reading the Dashboard documentation, make sure to understand how the Metrics work.

Dashboard Code

The dashboard code should live in [strategy]/dashboard/* The main dashboard file should be named: ui.py ([strategy]/dashboard/ui.py)

Here is a the recommended starting code for your dashboard:

# Basic dashboard framework
import copy
import streamlit as st

from src.dashboard.utils import get_strategy_class_from_name
from src.strategy.strategy_factory import get_parameters_from_config
from src.utils.utils import read_config_file

CONFIG_FILE_NAME = "config.json"

# ================================================================
# SESSION VARIABLES
# ================================================================
if "config" not in st.session_state:
st.session_state["config"] = None

if "initialized" not in st.session_state:
st.session_state["initialized"] = False


# ================================================================
# STREAMLIT APP
# ================================================================
st.title("Strategy Dashboard")

if st.session_state["config"] is None:
with st.spinner("Loading current configuration..."):
try:
st.session_state["config"] = read_config_file()
except Exception as e:
st.error(f"Error loading configuration. {e}")
st.stop()

try:
strat_ids = list(st.session_state["config"]["strategy_configs"].keys())
except Exception as e:
st.error(f"Error loading strategy IDs. {e}")
st.stop()

# The config file could have multiple configurations (e.g. multiple pools) we loop through them
for strat_id in strat_ids:
st.markdown("---")
try:
with st.spinner(f"Loading strategy: {strat_id}"):
config = copy.deepcopy(st.session_state["config"])
name, parameters = get_parameters_from_config(config, strat_id)
strategy_class = get_strategy_class_from_name(name)
strategy_config_class = strategy_class.get_config_model()
config = strategy_config_class(**parameters)
cur_strat_id = strat_id
except Exception as e:
st.error(f"Error loading strategy. {e}")
st.stop()

# Example shoring the config.json
st.write("Config:")
st.json(config)

# ------------------------------------------------
# DATA/METRICS
# ------------------------------------------------
try:
with st.spinner(f"Loading metrics..."):
# Load metrics here
pass
except Exception as e:
st.error(f"Error loading strategy data. {e}")
st.stop()


# ------------------------------------------------
# VISUALIZATION
# ------------------------------------------------
# Add visualization here

Here is a most advance Dashboard for a Uniswap V3 LP Strategy

# Advanced Dashboard for Uniswap V3 LP strategy
import copy
import json
import os
import sys
import traceback

import pandas as pd
import streamlit as st
from src.almanak_library.enums import Chain, Network
from src.almanak_library.init_sdk import initialize_sdks
from src.dashboard.data.data import PoolInfo
from src.dashboard.plots.plots import (
plot_liquidity_distribution,
plot_positions_over_time,
)
from src.dashboard.utils import (
get_positions_over_time,
get_strategy_class_from_name,
load_close_position_metrics_df,
load_fees_metrics_df,
load_open_position_metrics_df,
load_strategy_balance_metrics_df,
)
from src.strategy.strategy_factory import get_parameters_from_config
from src.strategy.utils.base_model import ModelConfig
from src.strategy.utils.pool_token import pooltoken_service
from src.strategy.utils.utils import DataFormat
from src.utils.config import Config
from src.utils.utils import (
get_protocol_sdk,
get_web3_by_network_and_chain,
read_config_file,
)

CONFIG_FILE_NAME = "config.json"

# ================================================================
# SESSION VARIABLES
# ================================================================
if "config" not in st.session_state:
st.session_state["config"] = None

if "initialized" not in st.session_state:
st.session_state["initialized"] = False

if "admin_mode" not in st.session_state:
st.session_state["admin_mode"] = False


# ================================================================
# STREAMLIT APP
# ================================================================
st.title("Strategy Dashboard")

if st.button("Admin Mode", use_container_width=True):
st.session_state["request_admin"] = True

if st.session_state.get("request_admin") and not st.session_state["admin_mode"]:
with st.form("admin_form"):
password = st.text_input("Enter Password", type="password")
submitted = st.form_submit_button("Enter")
if submitted:
if password == Config.get("DASHBOARD_PASSWORD"):
st.session_state["admin_mode"] = True
st.session_state["initialized"] = False
st.success("Admin mode activated!")
st.rerun()
else:
st.error("Invalid password")

if st.session_state["config"] is None:
with st.spinner("Loading current configuration..."):
try:
st.session_state["config"] = read_config_file()
except Exception as e:
st.error(f"Error loading configuration. {e}")
traceback.print_exc()
st.stop()

try:
strat_ids = list(st.session_state["config"]["strategy_configs"].keys())
except Exception as e:
st.error(f"Error loading strategy IDs. {e}")
st.stop()

for strat_id in strat_ids:
st.markdown("---")
try:
with st.spinner(f"Loading strategy: {strat_id}"):
config = copy.deepcopy(st.session_state["config"])
name, parameters = get_parameters_from_config(config, strat_id)
strategy_class = get_strategy_class_from_name(name)
strategy_config_class = strategy_class.get_config_model()
config = strategy_config_class(**parameters)
cur_strat_id = strat_id
except Exception as e:
st.error(f"Error loading strategy. {e}")
st.stop()

try:
if not st.session_state["initialized"]:
with st.spinner(f"Fetching pool(s) data..."):
initialize_sdks()
web3 = get_web3_by_network_and_chain(Network(config.network), Chain(config.chain))
uniswap_v3 = get_protocol_sdk(config.protocol, config.network, config.chain)
pooltoken = pooltoken_service.get_registry(
protocol=config.protocol,
chain=config.chain,
network=config.network,
web3=web3,
pool_abi=uniswap_v3.POOL_ABI,
token_abi=uniswap_v3.ERC20_ABI,
)
st.session_state["initialized"] = True
except Exception as e:
st.error(f"Error accessing onchain data. {e}")
st.stop()

try:
with st.spinner(f"Fetching pool(s) data..."):
pool = pooltoken.get_pool(config.pool_address)
except Exception as e:
st.error(f"Error loading pool data. {e}")
st.stop()

# ------------------------------------------------
# DATA/METRICS
# ------------------------------------------------
try:
# Get pool info
with st.spinner(f"Fetching Pool Info: {config.pool_address}"):
network = config.network
if config.network == Network.ANVIL:
st.warning("Changing the Network from Anvil to Mainnet to fetch Pool Info.")
network = Network.MAINNET
pool_info = PoolInfo.get(config.pool_address, chain=config.chain, network=network)
st.subheader(f"{pool_info.pair_name} {pool_info.fee_tier}")
except Exception as e:
st.error(f"Error fetching pool info. {e}")
st.stop()

try:
with st.spinner(f"Loading metrics..."):
df_close_positions = load_close_position_metrics_df(cur_strat_id)
df_open_positions = load_open_position_metrics_df(cur_strat_id)
df_fees = load_fees_metrics_df(cur_strat_id)
df_strat_balance_onchain = load_strategy_balance_metrics_df(cur_strat_id)
except Exception as e:
st.error(f"Error loading strategy data. {e}")
st.stop()

try:
# Get Positions Over Time
positions_list = get_positions_over_time(
df_open_positions,
df_close_positions,
fees=df_fees,
strat_balances=df_strat_balance_onchain,
uniswap_sdk=uniswap_v3,
token0_decimals=pool.token0.decimals,
token1_decimals=pool.token1.decimals,
)
except Exception as e:
st.error(f"Error getting positions over time. {e}")
st.stop()

# ------------------------------------------------
# PLOTS
# ------------------------------------------------
try:
# Plot Liquidity Distribution
if pool_info:
if pool_info.info_data_source == "TheGraph":
try:
all_processed_ticks = pd.DataFrame(pool_info.process_ticks())
if len(positions_list) > 0 and pd.isna(positions_list[-1].get("dateEnd")):
position_info = [positions_list[-1]]
else:
position_info = None
plot_liquidity_distribution(all_processed_ticks, pool_info, position_info)
except Exception as e:
st.warning(f"Plot could not be rendered. {e}")
else:
st.error(
"TheGraph (Uniswap V3 subgraph) is currently down, which impacts parts of the dashboard."
)
else:
st.error(f"Pool Info could not be fetched.")
except Exception as e:
st.error(f"Error plotting liquidity distribution. {e}")
st.stop()

try:
if not config.price_model:
price_model_params = {
"method": "raw",
"params": {"price_window": config.time_window},
"data_source": config.data_source.value,
"data_format": DataFormat.CLOSE.value,
}
config.price_model = ModelConfig(**json.loads(json.dumps(price_model_params)))

if not config.volatility_model:
volatility_model_params = {
"method": "std",
"params": {
"volatility_window": config.time_window,
"volatility_window_type": "rolling",
"volatility_factor": config.volatility_factor,
},
"data_source": config.data_source.value,
"data_format": DataFormat.CLOSE.value,
}
config.volatility_model = ModelConfig(**json.loads(json.dumps(volatility_model_params)))

# Plot positions Over Time
plot_positions_over_time(
positions=positions_list,
data_source=config.price_model.data_source,
pool=pool,
data_format=config.price_model.data_format,
granularity=config.granularity,
price_model=config.price_model,
volatility_model=config.volatility_model,
)
except Exception as e:
st.error(f"Error plotting positions over time. {e}")
traceback.print_exc()
st.stop()

# ------------------------------------------------
# ADMIN SECTION
# ------------------------------------------------
if st.session_state["admin_mode"]:
# This is an example of how to use the Admin mode.
st.write("Connected as Admin")
if st.session_state["admin_mode"]:
# This is an example of how to use the Admin mode.
st.write("Connected as Admin")

Fetching Metrics

In the Strategy Framework, the Dashboard Library offers general functions to help quickly create a nice Streamlit Dashboard with minimal time and effort. The Dashboard is meant to work with the Metrics.

In src.dashboard.utils we have included the 2 main functions to retrieve either the Action/System-Level metrics or Strategy/Agg-Level metrics:

  • load_metrics_action(strategy_id: str, metric_action_type: MetricActionType = None): to load action-level / system-level metrics.
  • load_metrics_agg(strategy_id: str, metric_agg_type: MetricAggType = None):: to load strategy-level / agg-level metrics.

We've also included the functions to load dataframes of the common types of Actions like:

from src.almanak_library.metrics.metrics_actions import MetricActionType, MetricsActionHandler

def load_gas_metrics_df(strategy_id: str):
try:
metrics = load_metrics_action(strategy_id, MetricActionType.GAS)
if metrics is None or len(metrics) == 0:
return None
df = pd.DataFrame(
[
{
"metric_id": metric.id,
"metric_type": metric.metric_type,
"time": metric.time,
"block_number": metric.block_number,
"strategy_id": metric.strategy_id,
"wallet": metric.wallet_address,
"action_id": metric.action_id,
"bundle_id": metric.bundle_id,
"function": metric.details["function"],
"gas_used": metric.details["gas_used"],
"tx_cost": float(metric.details["tx_cost"] * 10**-18),
}
for metric in metrics
]
)
except Exception as e:
raise ValueError(f"Error loading gas metrics: {e}")
return df

def load_open_position_metrics_df(strategy_id: str):
try:
metrics = load_metrics_action(strategy_id, MetricActionType.OPEN_POSITION)
if metrics is None or len(metrics) == 0:
return None
df = pd.DataFrame(
[
{
"metric_id": metric.id,
"metric_type": metric.metric_type,
"time": metric.time,
"block_number": metric.block_number,
"strategy_id": metric.strategy_id,
"wallet": metric.wallet_address,
"action_id": metric.action_id,
"bundle_id": metric.bundle_id,
"position_id": metric.details["position_id"],
"token0_symbol": metric.details["token0_symbol"],
"token1_symbol": metric.details["token1_symbol"],
"amount0": Decimal(metric.details["amount0"]),
"amount1": Decimal(metric.details["amount1"]),
"bound_tick_lower": metric.details["bound_tick_lower"],
"bound_tick_upper": metric.details["bound_tick_upper"],
"bound_price_lower": metric.details["bound_price_lower"],
"bound_price_upper": metric.details["bound_price_upper"],
"pool_tick": int(metric.details["pool_tick"]),
"pool_spot_rate": float(metric.details["pool_spot_rate"]),
}
for metric in metrics
]
)
except Exception as e:
raise ValueError(f"Error loading open position metrics: {e}")
return df

...

You don't need to re-implement these functions and can simply import them from src.dashboard.utils. If you need to fetch your own custom metrics, follow the same structure but use: load_metrics_agg instead, and send your custom metric name as you defined it in your strategy code like "SPREAD".

Here is the functions we implemented already for you:

  • load_gas_metrics_df
  • load_fees_metrics_df
  • load_open_position_metrics_df
  • load_close_position_metrics_df
  • load_swap_metrics_df
  • load_snapshot_metrics_df
  • load_strategy_balance_metrics_df
  • load_initialization_metrics_df
  • load_teardown_metrics_df
  • load_rebalance_trigger_metrics_df

Plots

In the Dashboard library we have shared convenient plots in src.dashboard.plots, however feel free to create your own custom Plotly graphs!
We're trying to make your life easier but feel free to be creative based on your own needs! See the Uniswap V3 LP Dashboard above for a more complete example.

  • plot_liquidity_distribution(all_processed_ticks, pool, position_info=None, simple=False)
  • def plot_positions_over_time( positions, data_source, pool, data_format="close", granularity="1h", start_time=None, end_time=None, price_model: ModelConfig = None, volatility_model: ModelConfig = None, hedge_trades=None, price_bounds_config=None, ):