Skip to content

Processing Data

ProgridPy transforms raw ISO files (CSVs, XLS) into standardized, Hive-partitioned Parquet datasets through a registry-driven processing pipeline. Each ISO client implements process_raw_data() following the same pattern, with ISO-specific processing bindings that declaratively control how raw data types are joined, filtered, and written.

Method Signature

def process_raw_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: ProcessedT | list[ProcessedT] | None = None,
    input_dir: str | Path | None = None,
    output_dir: str | Path | None = None,
    file_format: Literal["parquet", "csv"] = "parquet",
    overwrite: bool = False,
    verbose: bool = False,
) -> list[Path]:
Parameter Description
start_date Inclusive start date for processing.
end_date Inclusive end date.
data_types Processed data type(s) to produce. None produces all types.
input_dir Directory containing raw files. Uses default if None.
output_dir Directory for processed output. Uses default if None.
file_format Output format: "parquet" (default) or "csv".
overwrite Re-process dates that already have output files.
verbose Enable detailed logging.

Returns a list of Path objects for every file written.

Basic Usage

from progridpy.iso import MISO, MISOProcessedDataType

miso = MISO()
files = miso.process_raw_data(
    start_date="2026-01-01",
    end_date="2026-01-31",
    data_types=[MISOProcessedDataType.NODAL],
    input_dir="./data/miso/raw",
    output_dir="./data/miso/processed",
)
from progridpy.iso import SPP, SPPProcessedDataType

spp = SPP()
files = spp.process_raw_data(
    start_date="2026-01-01",
    end_date="2026-01-31",
    data_types=[SPPProcessedDataType.NODAL, SPPProcessedDataType.SYSTEM],
    input_dir="./data/spp/raw",
    output_dir="./data/spp/processed",
)
from progridpy.iso import ERCOT, ERCOTProcessedDataType

ercot = ERCOT()
files = ercot.process_raw_data(
    start_date="2026-01-01",
    end_date="2026-01-31",
    data_types=[ERCOTProcessedDataType.NODAL],
    input_dir="./data/ercot/raw",
    output_dir="./data/ercot/processed",
    file_format="parquet",
)

Processed Data Types

Processed types are granularity-based, not function-based. Each type represents a distinct spatial resolution.

ISO Type Description
MISO NODAL Per-node LMPs with broadcast load (404 hubs x 24h/day)
MISO SYSTEM System-wide load (24 rows/day)
MISO REGIONAL Wind/solar forecast by MISO region (24 rows/day)
SPP NODAL Per-node LMPs with broadcast load/resource data
SPP SYSTEM System-wide load (24 rows/day)
SPP ZONAL Resource data by reserve zone (24 x 5 rows/day)
ERCOT NODAL Per-node settlement point prices
ERCOT SYSTEM System-wide data
ERCOT WEATHER_ZONE Load and renewable data by weather zone
ERCOT LOAD_ZONE Load data by load zone

Hive-Partitioned Output

Processed files are written in a Hive-partitioned directory layout:

output_dir/
  year=2026/
    month=01/
      day=01/
        data.parquet
      day=02/
        data.parquet
      ...
    month=02/
      day=01/
        data.parquet

This is produced by ISOBase._hive_output_path():

base_dir / f"year={year}" / f"month={month}" / f"day={day}" / f"data.{file_format}"

The S3 key for processed data follows the same pattern:

{dir_name}/year=YYYY/month=MM/day=DD/data.parquet

For example: iso=spp/dataset=nodal/year=2026/month=01/day=15/data.parquet

Parquet is the default

Parquet is the recommended output format for efficient storage and fast columnar reads. Use file_format="csv" only when you need human-readable output for debugging.

Registry-Driven Processing Architecture

Processing is controlled declaratively through registries. This section explains the internal architecture for contributors and advanced users.

Processing Bindings

Each raw data type declares one or more ProcessingBinding objects that specify how it contributes to processed datasets.

from progridpy.iso.spp.types import SPPProcessingBinding, SPPProcessedDataType, SPPJoinMode

SPPProcessingBinding(
    dataset=SPPProcessedDataType.NODAL,
    join_mode=SPPJoinMode.NODE_LEFT,
    output_columns=("da_lmp", "da_mcc", "da_mlc"),
    required=True,  # This is the scaffold
)
from progridpy.iso.miso.types import MISOProcessingBinding, MISOProcessedDataType, MISOJoinMode

MISOProcessingBinding(
    dataset=MISOProcessedDataType.NODAL,
    join_mode=MISOJoinMode.NODE_LEFT,
    output_columns=("da_lmp",),
    required=True,
)
from progridpy.iso.ercot.types import ERCOTProcessingBinding, ERCOTProcessedDataType, ERCOTJoinMode

ERCOTProcessingBinding(
    dataset=ERCOTProcessedDataType.NODAL,
    join_mode=ERCOTJoinMode.NODE_LEFT,
    output_columns=("da_spp",),
    required=True,
)

Binding Fields

Field Description
dataset Which processed type this raw type feeds into.
join_mode How the raw data joins with the scaffold DataFrame.
output_columns Column names this raw type contributes to the final output.
required If True, this is the scaffold binding. Exactly one per processed type.

Join Modes

Join modes control how each raw data source merges with the scaffold.

SPP Join Modes (SPPJoinMode):

Mode Join Strategy
NODE_LEFT Left join on (interval_start_utc, interval_start_local, node)
TIME_LEFT Left join on (interval_start_utc, interval_start_local), broadcast to all nodes
TIME_SUM_LEFT Aggregate reserve-zone rows to time-level sums, then left join on time
TIME_FULL Full outer join on time to keep whichever side has a timestamp
ZONE_FULL Full outer join on (interval_start_utc, interval_start_local, reserve_zone)

MISO Join Modes (MISOJoinMode):

Mode Join Strategy
NODE_LEFT Left join on (interval_start_local, node)
NODE_LEFT_DEAD Left join on (interval_start_local, node) with dead_node flag
TIME_LEFT Left join on interval_start_local (broadcast to all nodes)
TIME_FULL Full outer join on interval_start_local

ERCOT Join Modes (ERCOTJoinMode):

Mode Join Strategy
NODE_LEFT Left join on node and time columns
TIME_LEFT Left join on time columns (broadcast)
TIME_FULL Full outer join on time columns
WEATHER_ZONE_FULL Full outer join on weather zone and time
LOAD_ZONE_FULL Full outer join on load zone and time

Processing Flow

For each date, the processing pipeline executes these steps:

  1. Load scaffold -- Read the required=True binding's raw file. This provides the primary dimension (e.g., the node dimension for NODAL).
  2. Iterate bindings -- For each remaining binding targeting this processed type, load the raw file and join it with the scaffold using the declared join_mode.
  3. Fill missing columns -- Any columns declared in output_columns that are not present after joins are filled with null.
  4. Select and sort -- Select only the columns listed in the processed registry's processed_output_columns, sorted by processed_key_columns.
  5. Write output -- Write to the Hive-partitioned path.

Data Definition

Each raw data type is mapped to a DataDefinition subclass in the ISO's registry:

from progridpy.iso.spp.types import SPPDataDefinition, SPPRawReaderType

SPPDataDefinition(
    dir_name="da_lmp",
    filename_suffix="DA-LMP-SL-202601150100",
    file_format="csv",
    reader_type=SPPRawReaderType.HOURLY_LMP,
    processed_bindings=(...),
    processed_key_columns=("interval_start_utc", "interval_start_local", "node"),
    processed_output_columns=("interval_start_utc", "interval_start_local", "node", "da_lmp", ...),
)

Timestamp Handling

Each ISO uses a different timestamp model. The processing pipeline respects these differences.

Do not assume uniform timezone semantics

Each ISO has its own market-time conventions. The processing pipeline handles these correctly per ISO.

ISO Model Key Column(s)
SPP UTC canonical. Computes interval_start_utc first, derives interval_start_local. DST ambiguity resolved by distinct UTC timestamps. interval_start_utc, interval_start_local
ERCOT Local-time based. Resolves local delivery date/hour with DSTFlag, converts to interval_start_utc, then derives interval_start_local. interval_start_utc, interval_start_local
MISO Fixed EST market time. Always 24 trading hours. No DST handling. interval_start_local only

End-to-End Example

Download raw data, process it, then upload the processed output to S3:

from progridpy.iso import SPP, SPPRawDataType, SPPProcessedDataType

spp = SPP()

# 1. Download raw data from the ISO
spp.download_raw_data(
    start_date="2026-01-01",
    end_date="2026-01-31",
    data_types=[
        SPPRawDataType.DAY_AHEAD_LMP,
        SPPRawDataType.LOAD_ACTUAL,
        SPPRawDataType.RESOURCE_ACTUAL,
    ],
    output_dir="./data/spp/raw",
)

# 2. Process raw data into Hive-partitioned Parquet
spp.process_raw_data(
    start_date="2026-01-01",
    end_date="2026-01-31",
    data_types=[SPPProcessedDataType.NODAL, SPPProcessedDataType.SYSTEM],
    input_dir="./data/spp/raw",
    output_dir="./data/spp/processed",
)

# 3. Upload processed data to S3
spp.upload_processed_data(
    start_date="2026-01-01",
    end_date="2026-01-31",
    input_dir="./data/spp/processed",
)