Skip to content

Extending Metrics

To compute trading metrics for a new ISO, register an ISOTradeSchema that maps ISO-specific DataFrame columns to the canonical format expected by the metrics pipeline. This guide covers creating schemas, defining column mappings, implementing normalizer functions, and registering via register_iso().

Pipeline Overview

ISO DataFrame -> ISOTradeSchema.adapt_frame() -> Canonical DataFrame -> MetricsCalculator -> MetricsResult

The schema layer adapts each ISO's column names and semantics to a canonical format so the calculator operates uniformly across all ISOs.

ISOTradeSchema

from progridpy.metrics.schema import ISOTradeSchema

schema = ISOTradeSchema(
    iso_name="NEW_ISO",
    column_map={...},
    gain_normalizer=my_normalizer_fn,
    capital_requirement_fn=my_capital_fn,       # Optional
    extra_validations=[my_validation_fn],       # Optional
)

Attributes

Attribute Type Description
iso_name str ISO identifier (e.g., "MISO", "ERCOT")
column_map Mapping[str, str] Maps canonical column names to ISO-specific column names
gain_normalizer GainNormalizer Callable that computes gain_normalized from the DataFrame
capital_requirement_fn CapitalRequirementFn \| None Optional callable to compute capital requirement
extra_validations list[Callable[[pd.DataFrame], None]] Additional validation functions

Step 1: Define the Column Map

The column map is a dictionary mapping canonical names (keys) to ISO-specific column names (values):

COLUMN_MAP: dict[str, str] = {
    "interval_start_local": "interval_start_local",  # Timestamp column
    "node": "node",                                    # Node/settlement point name
    "volume": "volume",                                # Trade volume (MWh)
    "cleared": "cleared",                              # Boolean: trade was cleared
    "da_lmp": "da_lmp",                                # Day-ahead price column
    "rt_lmp": "rt_lmp_final",                          # Real-time price column
    "gain": "gain",                                    # Dollar gain per trade
    "is_supply": "is_supply",                          # Boolean: supply vs demand
}

Column name mapping direction

Keys are canonical names used by the calculator. Values are the actual column names in your ISO's DataFrame. When the ISO uses the same names, the mapping is identity (key equals value).

The existing ISOs use these mappings:

MISO_COLUMN_MAP = {
    "interval_start_local": "interval_start_local",
    "node": "node",
    "volume": "volume",
    "cleared": "cleared",
    "da_lmp": "da_lmp",
    "rt_lmp": "rt_lmp_final",   # MISO uses rt_lmp_final
    "gain": "gain",
    "is_supply": "is_supply",
}
SPP_COLUMN_MAP = {
    "interval_start_local": "interval_start_local",
    "node": "node",
    "volume": "volume",
    "cleared": "cleared",
    "da_lmp": "da_lmp",
    "rt_lmp": "rt_lmp_final",
    "gain": "gain",
    "is_supply": "is_supply",
}
ERCOT_COLUMN_MAP = {
    "interval_start_local": "interval_start_local",
    "node": "node",
    "volume": "volume",
    "cleared": "cleared",
    "da_lmp": "da_spp",    # ERCOT uses da_spp
    "rt_lmp": "rt_spp",    # ERCOT uses rt_spp
    "gain": "gain",
    "is_supply": "is_supply",
}

Step 2: Implement the Gain Normalizer

The gain normalizer computes a gain_normalized column from the adapted DataFrame. The standard implementation normalizes to per-1-MWh per node-hour:

import pandas as pd

def gain_normalizer(df: pd.DataFrame) -> pd.Series:
    """Normalize gains to per 1 MWh per node-hour."""
    return df["gain"] / df["volume"]

The function signature is:

GainNormalizer = Callable[[pd.DataFrame], pd.Series]

It receives the DataFrame after column renaming (so columns are canonical names) and must return a pd.Series of the same length.

Step 3: Implement Capital Requirement (Optional)

Define a function that computes the ISO-specific capital requirement:

def capital_requirement(df_filtered: pd.DataFrame) -> float:
    """Compute capital requirement. Formula: average daily volume x $35.2."""
    daily_volume = df_filtered.groupby("date")["volume"].sum()
    return float(daily_volume.mean() * 35.2)

The function signature is:

CapitalRequirementFn = Callable[[pd.DataFrame], float]

It receives the standardized DataFrame (typically filtered to cleared trades) with a date column already added by adapt_frame().

If the ISO does not have a capital requirement, omit this parameter or pass None.

Step 4: Register the Schema

Use register_iso() to add the schema to the global registry:

from progridpy.metrics.schema import ISOTradeSchema
from progridpy.metrics.iso_registry import register_iso

NEW_ISO_SCHEMA = ISOTradeSchema(
    iso_name="NEW_ISO",
    column_map=COLUMN_MAP,
    gain_normalizer=gain_normalizer,
    capital_requirement_fn=capital_requirement,
)

register_iso(NEW_ISO_SCHEMA)

The schema is stored under schema.iso_name.upper(). If a schema already exists for that name, it is overwritten.

Registration must happen before use

The schema must be registered before MetricsEngine or MetricsCalculator is called with the ISO name. Place registration at module level so it executes on import.

Step 5: Use with MetricsEngine

After registration, the ISO name can be used with MetricsEngine:

from progridpy.metrics import MetricsEngine

engine = MetricsEngine(iso_name="NEW_ISO", df_iso=trade_df)
result = engine.compute()

print(f"Sharpe: {engine.sharpe_overall:.2f}")
print(f"Win Rate: {engine.win_rate_pct:.1f}%")

adapt_frame() Processing Steps

When MetricsCalculator.from_iso_frame() is called, the schema's adapt_frame() method transforms the input DataFrame through these steps:

  1. Validate columns -- Check that all ISO-specific columns from column_map.values() exist in the input DataFrame. Raises ValueError if any are missing.
  2. Rename columns -- Apply inverse mapping (ISO names to canonical names) and copy the DataFrame.
  3. Enforce datetime -- Ensure interval_start_local is datetime64.
  4. Compute gain_normalized -- Call self.gain_normalizer(df) and assign to the gain_normalized column.
  5. Add date column -- Extract date from interval_start_local.dt.date.
  6. Add iso column -- Set constant iso = self.iso_name.
  7. Run validations -- Execute any functions in extra_validations.
  8. Return new DataFrame -- The original input is never mutated.

Extra Validations

Add custom validation functions that run after adaptation:

def validate_no_negative_volume(df: pd.DataFrame) -> None:
    if (df["volume"] < 0).any():
        raise ValueError("Negative volume values found")

schema = ISOTradeSchema(
    iso_name="NEW_ISO",
    column_map=COLUMN_MAP,
    gain_normalizer=gain_normalizer,
    extra_validations=[validate_no_negative_volume],
)

Each validation receives the fully adapted DataFrame and should raise an exception if validation fails.

Querying the Registry

Retrieve a registered schema:

from progridpy.metrics.iso_registry import get_iso_schema

schema = get_iso_schema("MISO")       # Case-insensitive lookup
schema = get_iso_schema("miso")       # Also works

Raises KeyError if no schema is registered for the given name. The error message lists all available ISO names.

Complete Example

# src/progridpy/metrics/new_iso_schema.py
import pandas as pd

from progridpy.metrics.iso_registry import register_iso
from progridpy.metrics.schema import ISOTradeSchema

COLUMN_MAP: dict[str, str] = {
    "interval_start_local": "timestamp_local",
    "node": "settlement_point",
    "volume": "mw_volume",
    "cleared": "is_cleared",
    "da_lmp": "day_ahead_price",
    "rt_lmp": "realtime_price",
    "gain": "pnl",
    "is_supply": "supply_flag",
}


def _normalizer(df: pd.DataFrame) -> pd.Series:
    return df["gain"] / df["volume"]


def _capital_req(df: pd.DataFrame) -> float:
    daily_vol = df.groupby("date")["volume"].sum()
    return float(daily_vol.mean() * 42.0)


NEW_ISO_SCHEMA = ISOTradeSchema(
    iso_name="NEW_ISO",
    column_map=COLUMN_MAP,
    gain_normalizer=_normalizer,
    capital_requirement_fn=_capital_req,
)

register_iso(NEW_ISO_SCHEMA)

After importing this module, MetricsEngine(iso_name="NEW_ISO", df_iso=df) will work. The DataFrame must contain columns timestamp_local, settlement_point, mw_volume, is_cleared, day_ahead_price, realtime_price, pnl, and supply_flag.