Skip to content

Extending ISOs

This guide walks through adding support for a new ISO by subclassing ISOBase and creating the required types and registry modules. The SPP implementation is the reference pattern.

Architecture Overview

Each ISO is a Python package under src/progridpy/iso/ with the following structure:

src/progridpy/iso/new_iso/
    __init__.py         # Public exports
    client.py           # ISOBase subclass
    types.py            # StrEnum data types, JoinMode, ProcessingBinding, DataDefinition
    registry.py         # Maps each data type enum to a DataDefinition
    helpers.py          # Optional support module
    constants.py        # Optional constants

Step 1: Define Types

Create types.py with the ISO's data type enums, join modes, processing bindings, and data definition.

Raw Data Type Enum

Define a StrEnum listing every raw data type the ISO provides:

# src/progridpy/iso/new_iso/types.py
from __future__ import annotations

from dataclasses import dataclass
from enum import StrEnum

from progridpy.common.registry import DataDefinition


class NewISORawDataType(StrEnum):
    DAY_AHEAD_LMP = "DAY_AHEAD_LMP"
    REAL_TIME_LMP = "REAL_TIME_LMP"
    LOAD_FORECAST = "LOAD_FORECAST"
    LOAD_ACTUAL = "LOAD_ACTUAL"

    @classmethod
    def get_all(cls) -> list[NewISORawDataType]:
        return list(cls)

Processed Data Type Enum

Processed types are granularity-based (not function-based):

class NewISOProcessedDataType(StrEnum):
    NODAL = "NODAL"
    SYSTEM = "SYSTEM"
    ZONAL = "ZONAL"

    @classmethod
    def get_all(cls) -> list[NewISOProcessedDataType]:
        return list(cls)

Reader Type Enum

Classify the raw file formats the ISO uses:

class NewISORawReaderType(StrEnum):
    HOURLY_LMP = "HOURLY_LMP"
    LOAD_CSV = "LOAD_CSV"

Join Mode Enum

Define how raw data types merge with the scaffold during processing:

class NewISOJoinMode(StrEnum):
    NODE_LEFT = "NODE_LEFT"       # Left join on (time, node)
    TIME_LEFT = "TIME_LEFT"       # Left join on time, broadcast to all nodes
    TIME_FULL = "TIME_FULL"       # Full outer join on time

Processing Binding

Declare how each raw type contributes to a processed dataset:

@dataclass(frozen=True)
class NewISOProcessingBinding:
    dataset: NewISOProcessedDataType
    join_mode: NewISOJoinMode
    output_columns: tuple[str, ...]
    required: bool = False  # True for exactly one scaffold per processed type

Data Definition

Extend the base DataDefinition with ISO-specific fields:

@dataclass(frozen=True)
class NewISODataDefinition(DataDefinition):
    reader_type: NewISORawReaderType | None = None
    reader_args: tuple[str, ...] = ()
    processed_bindings: tuple[NewISOProcessingBinding, ...] = ()
    processed_key_columns: tuple[str, ...] = ()
    processed_output_columns: tuple[str, ...] = ()
    url_endpoint: str | None = None

The base DataDefinition provides dir_name, filename_suffix, and file_format fields.

Step 2: Create the Registry

Create registry.py to map each raw data type to its DataDefinition with processing bindings:

# src/progridpy/iso/new_iso/registry.py
from progridpy.common.registry import DataRegistry
from progridpy.iso.new_iso.types import (
    NewISODataDefinition,
    NewISOJoinMode,
    NewISOProcessedDataType,
    NewISOProcessingBinding,
    NewISORawDataType,
    NewISORawReaderType,
)


class NewISODataRegistry(DataRegistry[NewISORawDataType, NewISODataDefinition]):
    pass


NEW_ISO_REGISTRY = NewISODataRegistry()

# Register raw data types with their processing bindings
NEW_ISO_REGISTRY[NewISORawDataType.DAY_AHEAD_LMP] = NewISODataDefinition(
    dir_name="da_lmp",
    filename_suffix="da_lmp",
    file_format="csv",
    reader_type=NewISORawReaderType.HOURLY_LMP,
    processed_bindings=(
        NewISOProcessingBinding(
            dataset=NewISOProcessedDataType.NODAL,
            join_mode=NewISOJoinMode.NODE_LEFT,
            output_columns=("da_lmp",),
            required=True,  # This is the scaffold for NODAL
        ),
    ),
)

NEW_ISO_REGISTRY[NewISORawDataType.REAL_TIME_LMP] = NewISODataDefinition(
    dir_name="rt_lmp",
    filename_suffix="rt_lmp",
    file_format="csv",
    reader_type=NewISORawReaderType.HOURLY_LMP,
    processed_bindings=(
        NewISOProcessingBinding(
            dataset=NewISOProcessedDataType.NODAL,
            join_mode=NewISOJoinMode.NODE_LEFT,
            output_columns=("rt_lmp",),
        ),
    ),
)

NEW_ISO_REGISTRY[NewISORawDataType.LOAD_ACTUAL] = NewISODataDefinition(
    dir_name="load_actual",
    filename_suffix="load_actual",
    file_format="csv",
    reader_type=NewISORawReaderType.LOAD_CSV,
    processed_bindings=(
        NewISOProcessingBinding(
            dataset=NewISOProcessedDataType.NODAL,
            join_mode=NewISOJoinMode.TIME_LEFT,
            output_columns=("load_actual",),
        ),
        NewISOProcessingBinding(
            dataset=NewISOProcessedDataType.SYSTEM,
            join_mode=NewISOJoinMode.TIME_FULL,
            output_columns=("load_actual",),
            required=True,  # Scaffold for SYSTEM
        ),
    ),
)

# Define processed type metadata
PROCESSED_REGISTRY = {
    NewISOProcessedDataType.NODAL: NewISODataDefinition(
        dir_name="iso=new_iso/dataset=nodal",
        filename_suffix="",
        file_format="parquet",
        processed_key_columns=("interval_start_local", "node"),
        processed_output_columns=(
            "interval_start_local",
            "node",
            "da_lmp",
            "rt_lmp",
            "load_actual",
        ),
    ),
    NewISOProcessedDataType.SYSTEM: NewISODataDefinition(
        dir_name="iso=new_iso/dataset=system",
        filename_suffix="",
        file_format="parquet",
        processed_key_columns=("interval_start_local",),
        processed_output_columns=(
            "interval_start_local",
            "load_actual",
        ),
    ),
}

One required scaffold per processed type

Exactly one ProcessingBinding targeting each processed data type must have required=True. This binding provides the scaffold DataFrame that all other bindings join into.

Step 3: Implement the Client

Create client.py with an ISOBase subclass implementing all abstract methods:

# src/progridpy/iso/new_iso/client.py
from datetime import datetime
from pathlib import Path
from typing import Literal

from progridpy.common.types import ISO, FileLocation
from progridpy.iso.base import ISOBase
from progridpy.iso.new_iso.types import NewISORawDataType, NewISOProcessedDataType


class NewISO(ISOBase[NewISORawDataType, NewISOProcessedDataType]):
    def __init__(self) -> None:
        super().__init__(
            iso=ISO.PJM,             # Use the appropriate ISO enum value
            home_url="https://example-iso.com",
            timezone="US/Eastern",    # ISO's canonical timezone
        )

    def download_raw_data(
        self,
        start_date: str | datetime | None = None,
        end_date: str | datetime | None = None,
        data_types: NewISORawDataType | list[NewISORawDataType] | None = None,
        download_src: str | FileLocation = FileLocation.ISO,
        output_dir: str | Path | None = None,
        overwrite: bool = False,
        verbose: bool = False,
    ) -> list[Path]:
        # Implement download logic
        ...

    def upload_raw_data(
        self,
        start_date: str | datetime | None = None,
        end_date: str | datetime | None = None,
        data_types: NewISORawDataType | list[NewISORawDataType] | None = None,
        input_dir: str | Path | None = None,
        overwrite: bool = False,
        verbose: bool = False,
    ) -> list[str]:
        # Implement upload logic
        ...

    def download_processed_data(
        self,
        start_date: str | datetime | None = None,
        end_date: str | datetime | None = None,
        data_types: NewISOProcessedDataType | list[NewISOProcessedDataType] | None = None,
        output_dir: str | Path | None = None,
        overwrite: bool = False,
        verbose: bool = False,
    ) -> list[Path]:
        # Implement processed download logic
        ...

    def upload_processed_data(
        self,
        start_date: str | datetime | None = None,
        end_date: str | datetime | None = None,
        data_types: NewISOProcessedDataType | list[NewISOProcessedDataType] | None = None,
        input_dir: str | Path | None = None,
        overwrite: bool = False,
        verbose: bool = False,
    ) -> list[str]:
        # Implement processed upload logic
        ...

    def process_raw_data(
        self,
        start_date: str | datetime | None = None,
        end_date: str | datetime | None = None,
        data_types: NewISOProcessedDataType | list[NewISOProcessedDataType] | None = None,
        input_dir: str | Path | None = None,
        output_dir: str | Path | None = None,
        file_format: Literal["parquet", "csv"] = "parquet",
        overwrite: bool = False,
        verbose: bool = False,
    ) -> list[Path]:
        # Implement processing logic following the registry-driven pattern
        ...

Step 4: Create the Package

Set up __init__.py with public exports:

# src/progridpy/iso/new_iso/__init__.py
from progridpy.iso.new_iso.client import NewISO
from progridpy.iso.new_iso.types import (
    NewISOProcessedDataType,
    NewISORawDataType,
)

__all__ = [
    "NewISO",
    "NewISOProcessedDataType",
    "NewISORawDataType",
]

Then add the exports to src/progridpy/iso/__init__.py:

from progridpy.iso.new_iso import NewISO, NewISOProcessedDataType, NewISORawDataType

Step 5: Timestamp Handling

Choose the correct timestamp model based on the ISO's market conventions.

Do not copy SPP's timezone model blindly

Each ISO has its own market-time semantics. Verify the source-market conventions before deciding on the timestamp approach.

Pattern When to Use Example ISO
UTC canonical Source files use GMT/UTC timestamps SPP
Local with DST flag Source files are local-time with DST disambiguation ERCOT
Fixed market time ISO uses a fixed timezone with constant 24 hours/day MISO (EST)

Key-Building Hooks

ISOBase provides overridable methods for constructing S3 keys and file paths:

# Raw S3 key: {iso}/{dir_name}/{filename}
self._raw_s3_key("da_lmp", "20260115_da_lmp.csv")
# -> "new_iso/da_lmp/20260115_da_lmp.csv"

# Raw filename: {YYYYMMDD}_{suffix}.{ext}
self._raw_filename(date, "da_lmp", "csv")
# -> "20260115_da_lmp.csv"

# Processed S3 key: {dir_name}/year=YYYY/month=MM/day=DD/data.parquet
self._processed_s3_key("iso=new_iso/dataset=nodal", date)
# -> "iso=new_iso/dataset=nodal/year=2026/month=01/day=15/data.parquet"

# Hive output path: base/year=YYYY/month=MM/day=DD/data.{ext}
self._hive_output_path(base_dir, "20260115", "parquet")
# -> base_dir/year=2026/month=01/day=15/data.parquet

Override _raw_s3_prefix() if the ISO uses a non-standard S3 prefix structure.

Checklist

  • Create types.py with raw/processed StrEnum, JoinMode, ProcessingBinding, DataDefinition
  • Create registry.py mapping each raw type to its DataDefinition with bindings
  • Ensure exactly one required=True binding per processed data type
  • Implement all six abstract methods in the client
  • Verify timestamp semantics match the ISO's market conventions
  • Add public exports to __init__.py at both the package and iso/ level
  • Add the ISO to the ISO StrEnum in src/progridpy/common/types.py if not already present