Extending ISOs¶
This guide walks through adding support for a new ISO by subclassing ISOBase and creating the required types and registry modules. The SPP implementation is the reference pattern.
Architecture Overview¶
Each ISO is a Python package under src/progridpy/iso/ with the following structure:
src/progridpy/iso/new_iso/
__init__.py # Public exports
client.py # ISOBase subclass
types.py # StrEnum data types, JoinMode, ProcessingBinding, DataDefinition
registry.py # Maps each data type enum to a DataDefinition
helpers.py # Optional support module
constants.py # Optional constants
Step 1: Define Types¶
Create types.py with the ISO's data type enums, join modes, processing bindings, and data definition.
Raw Data Type Enum¶
Define a StrEnum listing every raw data type the ISO provides:
# src/progridpy/iso/new_iso/types.py
from __future__ import annotations
from dataclasses import dataclass
from enum import StrEnum
from progridpy.common.registry import DataDefinition
class NewISORawDataType(StrEnum):
DAY_AHEAD_LMP = "DAY_AHEAD_LMP"
REAL_TIME_LMP = "REAL_TIME_LMP"
LOAD_FORECAST = "LOAD_FORECAST"
LOAD_ACTUAL = "LOAD_ACTUAL"
@classmethod
def get_all(cls) -> list[NewISORawDataType]:
return list(cls)
Processed Data Type Enum¶
Processed types are granularity-based (not function-based):
class NewISOProcessedDataType(StrEnum):
NODAL = "NODAL"
SYSTEM = "SYSTEM"
ZONAL = "ZONAL"
@classmethod
def get_all(cls) -> list[NewISOProcessedDataType]:
return list(cls)
Reader Type Enum¶
Classify the raw file formats the ISO uses:
Join Mode Enum¶
Define how raw data types merge with the scaffold during processing:
class NewISOJoinMode(StrEnum):
NODE_LEFT = "NODE_LEFT" # Left join on (time, node)
TIME_LEFT = "TIME_LEFT" # Left join on time, broadcast to all nodes
TIME_FULL = "TIME_FULL" # Full outer join on time
Processing Binding¶
Declare how each raw type contributes to a processed dataset:
@dataclass(frozen=True)
class NewISOProcessingBinding:
dataset: NewISOProcessedDataType
join_mode: NewISOJoinMode
output_columns: tuple[str, ...]
required: bool = False # True for exactly one scaffold per processed type
Data Definition¶
Extend the base DataDefinition with ISO-specific fields:
@dataclass(frozen=True)
class NewISODataDefinition(DataDefinition):
reader_type: NewISORawReaderType | None = None
reader_args: tuple[str, ...] = ()
processed_bindings: tuple[NewISOProcessingBinding, ...] = ()
processed_key_columns: tuple[str, ...] = ()
processed_output_columns: tuple[str, ...] = ()
url_endpoint: str | None = None
The base DataDefinition provides dir_name, filename_suffix, and file_format fields.
Step 2: Create the Registry¶
Create registry.py to map each raw data type to its DataDefinition with processing bindings:
# src/progridpy/iso/new_iso/registry.py
from progridpy.common.registry import DataRegistry
from progridpy.iso.new_iso.types import (
NewISODataDefinition,
NewISOJoinMode,
NewISOProcessedDataType,
NewISOProcessingBinding,
NewISORawDataType,
NewISORawReaderType,
)
class NewISODataRegistry(DataRegistry[NewISORawDataType, NewISODataDefinition]):
pass
NEW_ISO_REGISTRY = NewISODataRegistry()
# Register raw data types with their processing bindings
NEW_ISO_REGISTRY[NewISORawDataType.DAY_AHEAD_LMP] = NewISODataDefinition(
dir_name="da_lmp",
filename_suffix="da_lmp",
file_format="csv",
reader_type=NewISORawReaderType.HOURLY_LMP,
processed_bindings=(
NewISOProcessingBinding(
dataset=NewISOProcessedDataType.NODAL,
join_mode=NewISOJoinMode.NODE_LEFT,
output_columns=("da_lmp",),
required=True, # This is the scaffold for NODAL
),
),
)
NEW_ISO_REGISTRY[NewISORawDataType.REAL_TIME_LMP] = NewISODataDefinition(
dir_name="rt_lmp",
filename_suffix="rt_lmp",
file_format="csv",
reader_type=NewISORawReaderType.HOURLY_LMP,
processed_bindings=(
NewISOProcessingBinding(
dataset=NewISOProcessedDataType.NODAL,
join_mode=NewISOJoinMode.NODE_LEFT,
output_columns=("rt_lmp",),
),
),
)
NEW_ISO_REGISTRY[NewISORawDataType.LOAD_ACTUAL] = NewISODataDefinition(
dir_name="load_actual",
filename_suffix="load_actual",
file_format="csv",
reader_type=NewISORawReaderType.LOAD_CSV,
processed_bindings=(
NewISOProcessingBinding(
dataset=NewISOProcessedDataType.NODAL,
join_mode=NewISOJoinMode.TIME_LEFT,
output_columns=("load_actual",),
),
NewISOProcessingBinding(
dataset=NewISOProcessedDataType.SYSTEM,
join_mode=NewISOJoinMode.TIME_FULL,
output_columns=("load_actual",),
required=True, # Scaffold for SYSTEM
),
),
)
# Define processed type metadata
PROCESSED_REGISTRY = {
NewISOProcessedDataType.NODAL: NewISODataDefinition(
dir_name="iso=new_iso/dataset=nodal",
filename_suffix="",
file_format="parquet",
processed_key_columns=("interval_start_local", "node"),
processed_output_columns=(
"interval_start_local",
"node",
"da_lmp",
"rt_lmp",
"load_actual",
),
),
NewISOProcessedDataType.SYSTEM: NewISODataDefinition(
dir_name="iso=new_iso/dataset=system",
filename_suffix="",
file_format="parquet",
processed_key_columns=("interval_start_local",),
processed_output_columns=(
"interval_start_local",
"load_actual",
),
),
}
One required scaffold per processed type
Exactly one ProcessingBinding targeting each processed data type must have required=True. This binding provides the scaffold DataFrame that all other bindings join into.
Step 3: Implement the Client¶
Create client.py with an ISOBase subclass implementing all abstract methods:
# src/progridpy/iso/new_iso/client.py
from datetime import datetime
from pathlib import Path
from typing import Literal
from progridpy.common.types import ISO, FileLocation
from progridpy.iso.base import ISOBase
from progridpy.iso.new_iso.types import NewISORawDataType, NewISOProcessedDataType
class NewISO(ISOBase[NewISORawDataType, NewISOProcessedDataType]):
def __init__(self) -> None:
super().__init__(
iso=ISO.PJM, # Use the appropriate ISO enum value
home_url="https://example-iso.com",
timezone="US/Eastern", # ISO's canonical timezone
)
def download_raw_data(
self,
start_date: str | datetime | None = None,
end_date: str | datetime | None = None,
data_types: NewISORawDataType | list[NewISORawDataType] | None = None,
download_src: str | FileLocation = FileLocation.ISO,
output_dir: str | Path | None = None,
overwrite: bool = False,
verbose: bool = False,
) -> list[Path]:
# Implement download logic
...
def upload_raw_data(
self,
start_date: str | datetime | None = None,
end_date: str | datetime | None = None,
data_types: NewISORawDataType | list[NewISORawDataType] | None = None,
input_dir: str | Path | None = None,
overwrite: bool = False,
verbose: bool = False,
) -> list[str]:
# Implement upload logic
...
def download_processed_data(
self,
start_date: str | datetime | None = None,
end_date: str | datetime | None = None,
data_types: NewISOProcessedDataType | list[NewISOProcessedDataType] | None = None,
output_dir: str | Path | None = None,
overwrite: bool = False,
verbose: bool = False,
) -> list[Path]:
# Implement processed download logic
...
def upload_processed_data(
self,
start_date: str | datetime | None = None,
end_date: str | datetime | None = None,
data_types: NewISOProcessedDataType | list[NewISOProcessedDataType] | None = None,
input_dir: str | Path | None = None,
overwrite: bool = False,
verbose: bool = False,
) -> list[str]:
# Implement processed upload logic
...
def process_raw_data(
self,
start_date: str | datetime | None = None,
end_date: str | datetime | None = None,
data_types: NewISOProcessedDataType | list[NewISOProcessedDataType] | None = None,
input_dir: str | Path | None = None,
output_dir: str | Path | None = None,
file_format: Literal["parquet", "csv"] = "parquet",
overwrite: bool = False,
verbose: bool = False,
) -> list[Path]:
# Implement processing logic following the registry-driven pattern
...
Step 4: Create the Package¶
Set up __init__.py with public exports:
# src/progridpy/iso/new_iso/__init__.py
from progridpy.iso.new_iso.client import NewISO
from progridpy.iso.new_iso.types import (
NewISOProcessedDataType,
NewISORawDataType,
)
__all__ = [
"NewISO",
"NewISOProcessedDataType",
"NewISORawDataType",
]
Then add the exports to src/progridpy/iso/__init__.py:
Step 5: Timestamp Handling¶
Choose the correct timestamp model based on the ISO's market conventions.
Do not copy SPP's timezone model blindly
Each ISO has its own market-time semantics. Verify the source-market conventions before deciding on the timestamp approach.
| Pattern | When to Use | Example ISO |
|---|---|---|
| UTC canonical | Source files use GMT/UTC timestamps | SPP |
| Local with DST flag | Source files are local-time with DST disambiguation | ERCOT |
| Fixed market time | ISO uses a fixed timezone with constant 24 hours/day | MISO (EST) |
Key-Building Hooks¶
ISOBase provides overridable methods for constructing S3 keys and file paths:
# Raw S3 key: {iso}/{dir_name}/{filename}
self._raw_s3_key("da_lmp", "20260115_da_lmp.csv")
# -> "new_iso/da_lmp/20260115_da_lmp.csv"
# Raw filename: {YYYYMMDD}_{suffix}.{ext}
self._raw_filename(date, "da_lmp", "csv")
# -> "20260115_da_lmp.csv"
# Processed S3 key: {dir_name}/year=YYYY/month=MM/day=DD/data.parquet
self._processed_s3_key("iso=new_iso/dataset=nodal", date)
# -> "iso=new_iso/dataset=nodal/year=2026/month=01/day=15/data.parquet"
# Hive output path: base/year=YYYY/month=MM/day=DD/data.{ext}
self._hive_output_path(base_dir, "20260115", "parquet")
# -> base_dir/year=2026/month=01/day=15/data.parquet
Override _raw_s3_prefix() if the ISO uses a non-standard S3 prefix structure.
Checklist¶
- Create
types.pywith raw/processedStrEnum,JoinMode,ProcessingBinding,DataDefinition - Create
registry.pymapping each raw type to itsDataDefinitionwith bindings - Ensure exactly one
required=Truebinding per processed data type - Implement all six abstract methods in the client
- Verify timestamp semantics match the ISO's market conventions
- Add public exports to
__init__.pyat both the package andiso/level - Add the ISO to the
ISOStrEnum insrc/progridpy/common/types.pyif not already present