Skip to content

ISO Module

The progridpy.iso package implements the data-download, upload, and processing pipeline for US Independent System Operators. Each ISO is a sub-package with a consistent internal structure: a client, types, and a registry.

Architecture

All ISO clients inherit from ISOBase[RawT, ProcessedT], a PEP 695 generic abstract class parameterized over two StrEnum types -- one for raw data categories and one for processed granularity levels.

Implementations

ISO Client Class Raw Types Processed Granularity Timezone Handling
SPP SPP SPPRawDataType (9 members) NODAL, SYSTEM, ZONAL UTC canonical; local derived
MISO MISO MISORawDataType (18 members) NODAL, SYSTEM, REGIONAL Fixed EST market time
ERCOT ERCOT ERCOTRawDataType (7 members) NODAL, SYSTEM, WEATHER_ZONE, LOAD_ZONE Local + DSTFlag; UTC derived

Internal Structure per ISO

Each ISO sub-package contains:

  • client.py -- The ISOBase subclass implementing download_raw_data, upload_raw_data, download_processed_data, upload_processed_data, and process_raw_data.
  • types.py -- StrEnum classes for raw and processed data types, reader types, join modes, processing bindings, and the ISO-specific DataDefinition subclass.
  • registry.py -- Concrete DataRegistry subclasses mapping each data-type enum to its DataDefinition, including file metadata and processing bindings.

Registry-Driven Processing

Processing is declarative. Each raw data type carries ProcessingBinding tuples that declare which processed dataset it feeds, the join mode to use against the scaffold DataFrame, and which output columns it contributes. The processing flow per date:

  1. Load the scaffold binding (the one marked required=True) to establish the primary dimension (e.g., DA LMP provides the node dimension for NODAL).
  2. Iterate remaining bindings, load raw files, and join using the declared JoinMode.
  3. Fill missing columns with NA, select output columns from the processed registry definition.
  4. Write to the hive-partitioned output path.

Classes

ERCOT

ERCOT(verbose: bool = False)

Bases: ISOBase[ERCOTRawDataType, ERCOTProcessedDataType]

Source code in src/progridpy/iso/ercot/client.py
def __init__(self, verbose: bool = False) -> None:
    super().__init__(iso=ISO.ERCOT, home_url="https://www.ercot.com/", timezone="America/Chicago")
    self.verbose = verbose
    self.raw_dir = Path("data/ercot/raw")
    self.processed_dir = Path("data/ercot/processed")
    self._ercot_api: ErcotAPI | None = None

Functions

clear_and_calculate_gain
clear_and_calculate_gain(trade_df: DataFrame, processed_df: DataFrame, min_offer_price: float = -500, max_bid_price: float = 2000) -> DataFrame

Clear trades and calculate financial gains based on market prices.

da_spp is the clearing price. No separate clearing column needed. Supply offers clear when da_spp >= offer_price. Demand bids clear when da_spp <= offer_price.

Source code in src/progridpy/iso/ercot/client.py
def clear_and_calculate_gain(
    self,
    trade_df: pl.DataFrame,
    processed_df: pl.DataFrame,
    min_offer_price: float = -500,
    max_bid_price: float = 2000,
) -> pl.DataFrame:
    """Clear trades and calculate financial gains based on market prices.

    da_spp is the clearing price. No separate clearing column needed.
    Supply offers clear when da_spp >= offer_price.
    Demand bids clear when da_spp <= offer_price.
    """
    merged = trade_df.join(processed_df, on=["interval_start_utc", "node"], how="inner", suffix="_proc")

    if "interval_start_local_proc" in merged.columns:
        merged = merged.drop("interval_start_local_proc")

    # Default offer price when not present
    if "offer_price" not in merged.columns:
        merged = merged.with_columns(
            pl.when(pl.col("is_supply"))
            .then(pl.lit(min_offer_price))
            .otherwise(pl.lit(max_bid_price))
            .alias("offer_price")
        )

    # Determine clearing: da_spp acts as clearing price
    supply_clears = pl.col("is_supply") & (pl.col("da_spp") >= pl.col("offer_price"))
    demand_clears = (~pl.col("is_supply")) & (pl.col("da_spp") <= pl.col("offer_price"))
    merged = merged.with_columns((supply_clears | demand_clears).alias("cleared"))

    # Gain only for cleared trades
    supply_gain = (pl.col("da_spp") - pl.col("rt_spp")) * pl.col("volume")
    demand_gain = (pl.col("rt_spp") - pl.col("da_spp")) * pl.col("volume")

    merged = merged.with_columns(
        pl.when(pl.col("cleared") & pl.col("is_supply"))
        .then(supply_gain)
        .when(pl.col("cleared") & (~pl.col("is_supply")))
        .then(demand_gain)
        .otherwise(pl.lit(0.0))
        .alias("gain"),
    )

    return merged.select(
        "interval_start_utc",
        "interval_start_local",
        "node",
        "is_supply",
        "da_spp",
        "rt_spp",
        "offer_price",
        "volume",
        "cleared",
        "gain",
    )
process_trade
process_trade(trade_dir: str | Path, processed_dir: str | Path, start_date: str | datetime | None = None, end_date: str | datetime | None = None) -> DataFrame

Process trade files and calculate gains based on DA/RT spread.

Source code in src/progridpy/iso/ercot/client.py
def process_trade(
    self,
    trade_dir: str | Path,
    processed_dir: str | Path,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
) -> pl.DataFrame:
    """Process trade files and calculate gains based on DA/RT spread."""
    trade_path = to_path(trade_dir)
    processed_pat = to_path(processed_dir)
    if trade_path is None or processed_pat is None:
        raise ValueError("trade_dir and processed_dir must be provided.")
    if trade_path.is_file():
        ensure_input_file(trade_path, "trade_dir")
    else:
        ensure_input_dir(trade_path, "trade_dir")
    if processed_pat.is_file():
        ensure_input_file(processed_pat, "processed_dir")
    else:
        ensure_input_dir(processed_pat, "processed_dir")

    if start_date is not None:
        start_date = parse_datetime(start_date, tz=self.timezone)
    if end_date is not None:
        end_date = parse_datetime(end_date, tz=self.timezone)

    trade_files_by_date: dict[datetime, pl.DataFrame] = {}

    if trade_path.is_file():
        trade_files_by_date = self._parse_trade_file(trade_path, start_date, end_date)
    else:
        for file_path in sorted(trade_path.glob("ERCOT-*.csv")):
            parts = file_path.stem.split("-")
            date_str = parts[1]
            file_date = parse_datetime(date_str, tz=self.timezone)

            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue

            trade_files_by_date[file_date] = self._build_trade_df(pl.read_csv(file_path), date_str)

    # Load processed nodal parquet files
    processed_files_by_date: dict[datetime, pl.DataFrame] = {}

    if processed_pat.is_file():
        file_date = extract_date_from_hive_path(str(processed_pat))
        if file_date is None:
            file_date = parse_datetime(processed_pat.stem, tz=self.timezone)
        else:
            file_date = file_date.replace(tzinfo=self.timezone)
        if (start_date is None or file_date >= start_date) and (end_date is None or file_date <= end_date):
            processed_files_by_date[file_date] = pl.read_parquet(processed_pat)
    else:
        parquet_files = sorted(processed_pat.rglob("data.parquet"))
        nodal_files = [fp for fp in parquet_files if "dataset=nodal" in str(fp)]

        for file_path in nodal_files:
            file_date = extract_date_from_hive_path(str(file_path))
            if file_date is None:
                continue
            file_date = file_date.replace(tzinfo=self.timezone)
            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue
            processed_files_by_date[file_date] = pl.read_parquet(file_path)

    if not trade_files_by_date:
        raise ValueError(f"No trade files found in {trade_path} for the specified date range")
    if not processed_files_by_date:
        raise ValueError(f"No processed files found in {processed_pat} for the specified date range")

    all_results: list[pl.DataFrame] = []

    for date in sorted(trade_files_by_date.keys()):
        if date not in processed_files_by_date:
            logger.warning("No processed data found for %s, skipping", date.strftime("%Y%m%d"))
            continue
        result_df = self.clear_and_calculate_gain(trade_files_by_date[date], processed_files_by_date[date])
        all_results.append(result_df)

    if not all_results:
        raise ValueError("No matching dates found between trade and processed data")

    return pl.concat(all_results)

MISO

MISO()

Bases: ISOBase[MISORawDataType, MISOProcessedDataType]

Source code in src/progridpy/iso/miso/client.py
def __init__(self) -> None:
    super().__init__(iso=ISO.MISO, home_url=HOME_URL, timezone="EST")
    self.raw_dir = Path("data/miso/raw")
    self.processed_dir = Path("data/miso/processed")
    self.enverus_client = EnverusClient()

Functions

clear_and_calculate_gain
clear_and_calculate_gain(trade_df: DataFrame, processed_df: DataFrame, min_offer_price: float = -500, max_bid_price: float = 2000) -> DataFrame

Clear trades and calculate financial gains based on market prices.

Supply offers clear when clearing_lmp >= offer_price. Demand bids clear when clearing_lmp <= offer_price. Dead nodes are excluded from clearing.

Source code in src/progridpy/iso/miso/client.py
def clear_and_calculate_gain(
    self,
    trade_df: pl.DataFrame,
    processed_df: pl.DataFrame,
    min_offer_price: float = -500,
    max_bid_price: float = 2000,
) -> pl.DataFrame:
    """Clear trades and calculate financial gains based on market prices.

    Supply offers clear when clearing_lmp >= offer_price.
    Demand bids clear when clearing_lmp <= offer_price.
    Dead nodes are excluded from clearing.
    """
    processed_df = processed_df.with_columns(pl.col("dead_node").cast(pl.Boolean, strict=False).fill_null(False))

    merged = trade_df.join(processed_df, on=["interval_start_local", "node"], how="inner")

    # Set default offer price if not present
    if "offer_price" not in merged.columns:
        merged = merged.with_columns(
            pl.when(pl.col("is_supply"))
            .then(pl.lit(min_offer_price))
            .otherwise(pl.lit(max_bid_price))
            .alias("offer_price")
        )

    active = ~pl.col("dead_node")

    # Determine clearing
    supply_clears = active & pl.col("is_supply") & (pl.col("clearing_lmp") >= pl.col("offer_price"))
    demand_clears = active & (~pl.col("is_supply")) & (pl.col("clearing_lmp") <= pl.col("offer_price"))
    merged = merged.with_columns((supply_clears | demand_clears).alias("cleared"))

    # RT price: prefer final, fall back to prelim
    merged = merged.with_columns(
        pl.coalesce(
            pl.col("rt_lmp_final").cast(pl.Float64, strict=False),
            pl.col("rt_lmp_prelim").cast(pl.Float64, strict=False),
        ).alias("rt_price")
    )

    # Calculate gain for cleared trades
    supply_gain = (pl.col("da_lmp") - pl.col("rt_price")) * pl.col("volume")
    demand_gain = (pl.col("rt_price") - pl.col("da_lmp")) * pl.col("volume")

    merged = merged.with_columns(
        pl.when(pl.col("cleared") & pl.col("is_supply"))
        .then(supply_gain)
        .when(pl.col("cleared") & (~pl.col("is_supply")))
        .then(demand_gain)
        .otherwise(pl.lit(0.0))
        .alias("gain")
    )

    return merged.select(
        "interval_start_local",
        "node",
        "dead_node",
        "is_supply",
        "clearing_lmp",
        "da_lmp",
        "rt_lmp_prelim",
        "rt_lmp_final",
        "offer_price",
        "volume",
        "cleared",
        "gain",
    )
process_trade
process_trade(trade_dir: str | Path, processed_dir: str | Path, start_date: str | datetime | None = None, end_date: str | datetime | None = None) -> DataFrame

Process trade files and calculate gains based on market clearing.

Source code in src/progridpy/iso/miso/client.py
def process_trade(
    self,
    trade_dir: str | Path,
    processed_dir: str | Path,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
) -> pl.DataFrame:
    """Process trade files and calculate gains based on market clearing."""
    trade_dir = to_path(trade_dir)
    processed_dir = to_path(processed_dir)
    if trade_dir is None or processed_dir is None:
        raise ValueError("trade_dir and processed_dir must be provided.")
    if trade_dir.is_file():
        ensure_input_file(trade_dir, "trade_dir")
    else:
        ensure_input_dir(trade_dir, "trade_dir")
    if processed_dir.is_file():
        ensure_input_file(processed_dir, "processed_dir")
    else:
        ensure_input_dir(processed_dir, "processed_dir")

    if start_date is not None:
        start_date = parse_datetime(start_date, tz=self.timezone)
    if end_date is not None:
        end_date = parse_datetime(end_date, tz=self.timezone)

    trade_files_by_date: dict[datetime, pl.DataFrame] = {}

    if trade_dir.is_file():
        trade_files_by_date = self._parse_trade_file(trade_dir, start_date, end_date)
    else:
        for file_path in sorted(trade_dir.glob("MISO-*.csv")):
            parts = file_path.stem.split("-")
            date_str = parts[1]
            file_date = parse_datetime(date_str, tz=self.timezone)

            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue

            raw_df = pl.read_csv(file_path)
            raw_df = raw_df.rename({c: c.lower() for c in raw_df.columns})
            raw_df = raw_df.with_columns(pl.col("type").str.to_lowercase())

            if not raw_df["he"].is_between(1, 24).all():
                invalid = raw_df.filter(~pl.col("he").is_between(1, 24))["he"].unique().to_list()
                raise ValueError(f"HE values must be between 1 and 24, found: {invalid}")

            raw_df = raw_df.with_columns(
                (
                    pl.lit(date_str).str.strptime(pl.Date, "%Y%m%d").cast(pl.Datetime("us"))
                    + pl.duration(hours=pl.col("he").cast(pl.Int32) - 1)
                ).alias("interval_start_local"),
                (pl.col("type") == "offer").alias("is_supply"),
            ).rename({"mwh": "volume", "price": "offer_price"})

            trade_files_by_date[file_date] = raw_df.select(
                "interval_start_local", "node", "volume", "is_supply", "offer_price"
            )

    # Load processed files
    processed_files_by_date: dict[datetime, pl.DataFrame] = {}

    if processed_dir.is_file():
        file_date = extract_date_from_hive_path(str(processed_dir))
        if file_date is None:
            file_date = parse_datetime(processed_dir.stem, tz=self.timezone)
        else:
            file_date = file_date.replace(tzinfo=self.timezone)
        if (start_date is None or file_date >= start_date) and (end_date is None or file_date <= end_date):
            processed_files_by_date[file_date] = pl.read_parquet(processed_dir)
    else:
        parquet_files = sorted(processed_dir.rglob("data.parquet"))
        nodal_files = [file_path for file_path in parquet_files if "dataset=nodal" in str(file_path)]

        for file_path in nodal_files:
            file_date = extract_date_from_hive_path(str(file_path))
            if file_date is None:
                continue
            file_date = file_date.replace(tzinfo=self.timezone)
            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue
            processed_files_by_date[file_date] = pl.read_parquet(file_path)

    if not trade_files_by_date:
        raise ValueError(f"No trade files found in {trade_dir} for the specified date range")
    if not processed_files_by_date:
        raise ValueError(f"No processed files found in {processed_dir} for the specified date range")

    all_results: list[pl.DataFrame] = []

    for date in sorted(trade_files_by_date.keys()):
        if date not in processed_files_by_date:
            logger.warning(f"No processed data found for {date.strftime('%Y%m%d')}, skipping")
            continue
        result_df = self.clear_and_calculate_gain(trade_files_by_date[date], processed_files_by_date[date])
        all_results.append(result_df)

    if not all_results:
        raise ValueError("No matching dates found between trade and processed data")

    return pl.concat(all_results)

SPP

SPP()

Bases: ISOBase[SPPRawDataType, SPPProcessedDataType]

Source code in src/progridpy/iso/spp/client.py
def __init__(self) -> None:
    super().__init__(iso=ISO.SPP, home_url=MARKETPLACE_URL, timezone="America/Chicago")
    self.raw_dir = Path("data/spp/raw")
    self.processed_dir = Path("data/spp/processed")
    self._file_browser_folder_cache: dict[tuple[str, str], list[_FileBrowserEntry]] = {}

Functions

download_raw_data
download_raw_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPRawDataType | list[SPPRawDataType] | None = None, download_src: str | FileLocation = ISO, output_dir: str | Path | None = None, overwrite: bool = False, verbose: bool = False) -> list[Path]

Download raw data from either SPP website or S3 bucket. The start and end dates are inclusive.

Arguments: start_date (str | datetime | None): the start date to fetch reports for, defaults to today. String formats supported: YYYYMMDD, YYYY/MM/DD, YYYY-MM-DD end_date (str | datetime | None): the end date to fetch reports for, defaults to today. String formats supported: YYYYMMDD, YYYY/MM/DD, YYYY-MM-DD data_types (SPPRawDataType | list[SPPRawDataType]): the data types to fetch, defaults to all. download_src (str | FileLocation): the source to download from, defaults to S3. String values supported: "s3", "iso" output_dir (str | Path): the path to save the data to, defaults to data/spp/raw. overwrite (bool): whether to overwrite existing files, defaults to False. verbose (bool): whether to print verbose output, defaults to False. Returns: list[Path]: A list of paths to the downloaded files.

Source code in src/progridpy/iso/spp/client.py
def download_raw_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPRawDataType | list[SPPRawDataType] | None = None,
    download_src: str | FileLocation = FileLocation.ISO,
    output_dir: str | Path | None = None,
    overwrite: bool = False,
    verbose: bool = False,
) -> list[Path]:
    """
    Download raw data from either SPP website or S3 bucket. The start and end dates are inclusive.

    Arguments:
        start_date (str | datetime | None): the start date to fetch reports for, defaults to today.
            String formats supported: YYYYMMDD, YYYY/MM/DD, YYYY-MM-DD
        end_date (str | datetime | None): the end date to fetch reports for, defaults to today.
            String formats supported: YYYYMMDD, YYYY/MM/DD, YYYY-MM-DD
        data_types (SPPRawDataType | list[SPPRawDataType]): the data types to fetch, defaults to all.
        download_src (str | FileLocation): the source to download from, defaults to S3.
            String values supported: "s3", "iso"
        output_dir (str | Path): the path to save the data to, defaults to data/spp/raw.
        overwrite (bool): whether to overwrite existing files, defaults to False.
        verbose (bool): whether to print verbose output, defaults to False.
    Returns:
        list[Path]: A list of paths to the downloaded files.
    """
    start_date = parse_datetime(start_date, tz=self.timezone)
    end_date = parse_datetime(end_date, tz=self.timezone)

    # Convert string parameters to appropriate types
    if isinstance(download_src, str):
        download_src = parse_file_location(download_src)

    # Convert single data type to list to get consistent behavior
    if isinstance(data_types, SPPRawDataType):
        data_types = [data_types]

    # Set defaults
    if data_types is None:
        data_types = SPPRawDataType.get_all()

    output_dir = to_path(output_dir) or self.raw_dir
    output_dir = ensure_output_dir(output_dir, "output_dir")

    match download_src:
        case FileLocation.S3:
            return self._download_raw_data_from_s3(start_date, end_date, data_types, output_dir, overwrite, verbose)
        case FileLocation.ISO:
            return self._download_from_iso(start_date, end_date, data_types, output_dir, overwrite, verbose)
        case _:
            raise ValueError(f"Unsupported download location: {download_src}")
upload_raw_data
upload_raw_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPRawDataType | list[SPPRawDataType] | None = None, input_dir: str | Path | None = None, overwrite: bool = False, verbose: bool = False) -> list[str]

Upload raw data files to S3.

Scans input_dir/{dir_name}/ for files matching the date range, then uploads them with deterministic S3 keys.

Source code in src/progridpy/iso/spp/client.py
def upload_raw_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPRawDataType | list[SPPRawDataType] | None = None,
    input_dir: str | Path | None = None,
    overwrite: bool = False,
    verbose: bool = False,
) -> list[str]:
    """Upload raw data files to S3.

    Scans input_dir/{dir_name}/ for files matching the date range, then uploads them
    with deterministic S3 keys.
    """
    start_date = parse_datetime(start_date, tz=self.timezone)
    end_date = parse_datetime(end_date, tz=self.timezone)

    if isinstance(data_types, SPPRawDataType):
        data_types = [data_types]
    if data_types is None:
        data_types = SPPRawDataType.get_all()

    input_dir = ensure_input_dir(to_path(input_dir) or self.raw_dir, "input_dir")

    console = Console(record=True)
    console.print(
        "🚀 [bold]Uploading SPP Raw Data to S3[/bold]: ",
        f"[bold cyan]📅{start_date.strftime('%Y%m%d')}[/bold cyan] - "
        f"[bold cyan]📅{end_date.strftime('%Y%m%d')}[/bold cyan]",
    )

    all_uploaded: list[str] = []
    stats_by_data_type: dict[SPPRawDataType, list[int]] = {}

    with S3Handler(verbose=verbose) as handler:
        for data_type in data_types:
            data_def = SPPRawDataRegistry.get_definition(data_type)
            ext = data_def.file_extension
            type_dir = input_dir / data_def.dir_name

            if not type_dir.exists():
                stats_by_data_type[data_type] = [0, 0, 0]
                continue

            # Scan local files and filter by date range
            refs: list[S3ObjectRef] = []
            for local_file in sorted(type_dir.glob(f"*.{ext}")):
                try:
                    date_str = local_file.stem.split("_")[0]
                    file_date = datetime.strptime(date_str, "%Y%m%d").replace(tzinfo=self.timezone)
                except (ValueError, IndexError):
                    continue
                if not (start_date <= file_date <= end_date):
                    continue
                key = self._raw_s3_key(data_def.dir_name, local_file.name)
                refs.append(S3ObjectRef(key=key, local_path=local_file))

            uploaded, skipped, failed = handler.upload_objects(
                S3_BUCKETS.raw,
                refs,
                overwrite=overwrite,
                description=f"[{data_def.title}]",
            )
            all_uploaded.extend(uploaded)
            stats_by_data_type[data_type] = [len(uploaded), len(skipped), len(failed)]

    display_data_transfer_summary(
        stats_by_data_type, SPPRawDataRegistry, input_dir, transfer_type="upload", console=console
    )
    return all_uploaded
download_processed_data
download_processed_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None, output_dir: str | Path | None = None, overwrite: bool = False, verbose: bool = False) -> list[Path]

Download processed data from S3 using deterministic hive-partitioned keys.

Source code in src/progridpy/iso/spp/client.py
def download_processed_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None,
    output_dir: str | Path | None = None,
    overwrite: bool = False,
    verbose: bool = False,
) -> list[Path]:
    """Download processed data from S3 using deterministic hive-partitioned keys."""
    start_date = parse_datetime(start_date, tz=self.timezone)
    end_date = parse_datetime(end_date, tz=self.timezone)

    if isinstance(data_types, SPPProcessedDataType):
        data_types = [data_types]
    if data_types is None:
        data_types = SPPProcessedDataType.get_all()

    output_dir = ensure_output_dir(to_path(output_dir) or self.processed_dir, "output_dir")

    console = Console(record=True)
    console.print(
        "🚀 [bold]Downloading SPP Processed Data from S3[/bold]: ",
        f"[bold cyan]📅{start_date.strftime('%Y%m%d')}[/bold cyan] - "
        f"[bold cyan]📅{end_date.strftime('%Y%m%d')}[/bold cyan]",
    )

    all_downloaded: list[Path] = []
    stats_by_data_type: dict[SPPProcessedDataType, list[int]] = {}

    with S3Handler(verbose=verbose) as handler:
        for data_type in data_types:
            refs = self._build_processed_s3_refs(data_type, start_date, end_date, output_dir)
            downloaded, skipped, failed = handler.download_objects(
                S3_BUCKETS.processed,
                refs,
                overwrite=overwrite,
                description=f"[{SPPProcessedDataRegistry.get_title(data_type)}]",
            )
            all_downloaded.extend(downloaded)
            stats_by_data_type[data_type] = [len(downloaded), len(skipped), len(failed)]

    display_data_transfer_summary(
        stats_by_data_type, SPPProcessedDataRegistry, output_dir, transfer_type="download", console=console
    )
    console.print(f"💾 Files saved to: [bold cyan]{output_dir.absolute()}[/bold cyan]")
    return all_downloaded
upload_processed_data
upload_processed_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None, input_dir: str | Path | None = None, overwrite: bool = False, verbose: bool = False) -> list[str]

Upload processed parquet files to S3.

Scans input_dir for hive-partitioned data.parquet files, filters by date range, then uploads with deterministic S3 keys.

Source code in src/progridpy/iso/spp/client.py
def upload_processed_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None,
    input_dir: str | Path | None = None,
    overwrite: bool = False,
    verbose: bool = False,
) -> list[str]:
    """Upload processed parquet files to S3.

    Scans input_dir for hive-partitioned data.parquet files, filters by date range,
    then uploads with deterministic S3 keys.
    """
    start_date = parse_datetime(start_date, tz=self.timezone)
    end_date = parse_datetime(end_date, tz=self.timezone)

    if isinstance(data_types, SPPProcessedDataType):
        data_types = [data_types]
    if data_types is None:
        data_types = SPPProcessedDataType.get_all()

    input_dir = ensure_input_dir(to_path(input_dir) or self.processed_dir, "input_dir")

    console = Console(record=True)
    console.print(
        "🚀 [bold]Uploading SPP Processed Data to S3[/bold]: ",
        f"[bold cyan]📅{start_date.strftime('%Y%m%d')}[/bold cyan] - "
        f"[bold cyan]📅{end_date.strftime('%Y%m%d')}[/bold cyan]",
    )

    all_uploaded: list[str] = []
    stats_by_data_type: dict[SPPProcessedDataType, list[int]] = {}

    with S3Handler(verbose=verbose) as handler:
        for data_type in data_types:
            data_def = SPPProcessedDataRegistry.get_definition(data_type)
            type_dir = input_dir / data_def.dir_name

            if not type_dir.exists():
                stats_by_data_type[data_type] = [0, 0, 0]
                continue

            # Scan for data.parquet files in hive structure
            refs: list[S3ObjectRef] = []
            for parquet_file in sorted(type_dir.rglob("data.parquet")):
                rel_path = str(parquet_file.relative_to(input_dir))
                file_date = extract_date_from_hive_path(rel_path)
                if file_date is None:
                    continue
                file_date = file_date.replace(tzinfo=self.timezone)
                if not (start_date <= file_date <= end_date):
                    continue
                key = self._processed_s3_key(data_def.dir_name, file_date)
                refs.append(S3ObjectRef(key=key, local_path=parquet_file))

            uploaded, skipped, failed = handler.upload_objects(
                S3_BUCKETS.processed,
                refs,
                overwrite=overwrite,
                description=f"[{data_def.title}]",
            )
            all_uploaded.extend(uploaded)
            stats_by_data_type[data_type] = [len(uploaded), len(skipped), len(failed)]

    display_data_transfer_summary(
        stats_by_data_type, SPPProcessedDataRegistry, input_dir, transfer_type="upload", console=console
    )
    return all_uploaded
process_raw_data
process_raw_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None, input_dir: str | Path | None = None, output_dir: str | Path | None = None, file_format: Literal['parquet', 'csv'] = 'parquet', overwrite: bool = False, verbose: bool = False) -> list[Path]

Process the downloaded raw data into a standardized format.

Args: start_date: Start date to filter data (if None, process from oldest available) end_date: End date to filter data (if None, process to latest available) data_types: Data type(s) to process (defaults to all types) input_dir: Directory containing raw data files (defaults to self.raw_dir) output_dir: Directory to save processed files (defaults to self.processed_dir) file_format: Output file format, either "parquet" or "csv" (defaults to "parquet") overwrite: Whether to overwrite existing processed files verbose: Whether to print verbose output

Returns: list[Path]: List of paths to successfully processed files

Source code in src/progridpy/iso/spp/client.py
def process_raw_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None,
    input_dir: str | Path | None = None,
    output_dir: str | Path | None = None,
    file_format: Literal["parquet", "csv"] = "parquet",
    overwrite: bool = False,
    verbose: bool = False,
) -> list[Path]:
    """Process the downloaded raw data into a standardized format.

    Args:
        start_date: Start date to filter data (if None, process from oldest available)
        end_date: End date to filter data (if None, process to latest available)
        data_types: Data type(s) to process (defaults to all types)
        input_dir: Directory containing raw data files (defaults to self.raw_dir)
        output_dir: Directory to save processed files (defaults to self.processed_dir)
        file_format: Output file format, either "parquet" or "csv" (defaults to "parquet")
        overwrite: Whether to overwrite existing processed files
        verbose: Whether to print verbose output

    Returns:
        list[Path]: List of paths to successfully processed files
    """
    # Parse and validate parameters
    start_date = parse_datetime(start_date, tz=self.timezone) if start_date else None
    end_date = parse_datetime(end_date, tz=self.timezone) if end_date else None

    # Convert single data type to list
    if isinstance(data_types, SPPProcessedDataType):
        data_types = [data_types]

    # Set defaults
    if data_types is None:
        data_types = SPPProcessedDataType.get_all()
    input_dir = to_path(input_dir) or self.raw_dir
    output_dir = to_path(output_dir) or self.processed_dir
    input_dir = ensure_input_dir(input_dir, "input_dir")
    output_dir = ensure_output_dir(output_dir, "output_dir")

    self._validate_processing_registry()

    # Process each data type using match/case dispatcher
    all_processed_files = []
    stats_by_data_type = defaultdict(lambda: [0, 0, 0])  # [processed, skipped, failed]
    console = Console(record=True)

    for data_type in data_types:
        match data_type:
            case SPPProcessedDataType.NODAL:
                processed_files, stats = self._process_nodal_files(
                    start_date=start_date,
                    end_date=end_date,
                    input_dir=input_dir,
                    output_dir=output_dir,
                    overwrite=overwrite,
                    verbose=verbose,
                    file_format=file_format,
                )
            case SPPProcessedDataType.SYSTEM:
                processed_files, stats = self._process_system_files(
                    start_date=start_date,
                    end_date=end_date,
                    input_dir=input_dir,
                    output_dir=output_dir,
                    overwrite=overwrite,
                    verbose=verbose,
                    file_format=file_format,
                )
            case SPPProcessedDataType.ZONAL:
                processed_files, stats = self._process_zonal_files(
                    start_date=start_date,
                    end_date=end_date,
                    input_dir=input_dir,
                    output_dir=output_dir,
                    overwrite=overwrite,
                    verbose=verbose,
                    file_format=file_format,
                )
            case _:
                raise ValueError(f"Unsupported processed data type: {data_type}")

        # Collect results
        all_processed_files.extend(processed_files)
        stats_by_data_type[data_type] = stats

    # Display summary table
    display_data_transfer_summary(
        stats_by_data_type, SPPProcessedDataRegistry, output_dir, transfer_type="process", console=console
    )
    console.print(f"💾 Files saved to: [bold cyan]{output_dir.absolute()}[/bold cyan]")

    return all_processed_files
download_rt_lmp_rolling
download_rt_lmp_rolling(output_dir: str | Path | None = None, overwrite: bool = False) -> Path | None

Download today's real-time rolling LMP data by merging 5-minute interval files.

Source code in src/progridpy/iso/spp/client.py
def download_rt_lmp_rolling(
    self,
    output_dir: str | Path | None = None,
    overwrite: bool = False,
) -> Path | None:
    """Download today's real-time rolling LMP data by merging 5-minute interval files."""
    base_output_dir = to_path(output_dir) or self.raw_dir
    base_output_dir = ensure_output_dir(base_output_dir, "output_dir")
    output_dir = base_output_dir / SPPRawDataRegistry.get_dir_name(SPPRawDataType.REAL_TIME_ROLLING_LMP)
    output_dir = ensure_output_dir(output_dir, "output_dir")

    date = parse_datetime(None, tz=self.timezone)

    suffix = SPPRawDataRegistry.get_file_suffix(SPPRawDataType.REAL_TIME_ROLLING_LMP)
    extension = SPPRawDataRegistry.get_file_extension(SPPRawDataType.REAL_TIME_ROLLING_LMP)
    output_file = output_dir / f"{date.strftime('%Y%m%d')}_{suffix}.{extension}"

    if output_file.exists() and not overwrite:
        return None

    urls = self._construct_url(
        SPPRawDataType.REAL_TIME_PRELIM_LMP,
        date=date,
        resolve_latest_rc=True,
    )
    if not urls:
        logger.warning(f"No RT prelim intervals found for {date.strftime('%Y%m%d')}")
        return None

    with tempfile.TemporaryDirectory() as tmpdirname:
        temp_dir = Path(tmpdirname)
        output_files = [temp_dir / url.split("=")[-1].split("%2F")[-1] for url in urls]

        with FileDownloader(max_connections=32) as downloader:
            file_paths, _, _ = downloader.download_batch(
                urls=urls,
                output_files=output_files,
                max_concurrent_files=32,
                description=f"[{SPPRawDataRegistry.get_title(SPPRawDataType.REAL_TIME_ROLLING_LMP)}]",
                fetch_metadata=False,
            )

        interval_files = [p for p in file_paths if p.exists()]

        if not interval_files:
            logger.warning(f"No interval files downloaded for {date.strftime('%Y%m%d')}")
            return None

        try:
            merged_df = self._merge_rt_lmp_intervals(interval_files)
            if merged_df is not None:
                merged_df = self._canonicalize_raw_df(
                    merged_df,
                    SPPRawDataType.REAL_TIME_ROLLING_LMP,
                    file_name=output_file.name,
                    stage="download",
                )
                data_def = SPPRawDataRegistry.get_definition(SPPRawDataType.REAL_TIME_ROLLING_LMP)
                expected_cols = list(data_def.raw_expected_columns)
                available_cols = [col for col in expected_cols if col in merged_df.columns]
                merged_df.select(available_cols).write_csv(output_file)
                return output_file
            return None
        except Exception as e:
            logger.error(f"Error merging RT rolling LMP for {date.strftime('%Y%m%d')}: {e!s}")
            return None
download_archival_data
download_archival_data(year: int, data_type: SPPRawDataType, output_dir: str | Path, overwrite: bool = False) -> Path

Download a yearly zip archive from SPP's file-browser API.

Args: year: The year to download (e.g. 2023). data_type: The raw data type to download the archive for. output_dir: Base directory where the zip will be saved under spp/{zip_dir}/{year}.zip. overwrite: Whether to overwrite an existing zip file.

Returns: Path to the downloaded zip file.

Raises: ValueError: If the data type is not available as an archive.

Source code in src/progridpy/iso/spp/client.py
def download_archival_data(
    self,
    year: int,
    data_type: SPPRawDataType,
    output_dir: str | Path,
    overwrite: bool = False,
) -> Path:
    """Download a yearly zip archive from SPP's file-browser API.

    Args:
        year: The year to download (e.g. 2023).
        data_type: The raw data type to download the archive for.
        output_dir: Base directory where the zip will be saved under spp/{zip_dir}/{year}.zip.
        overwrite: Whether to overwrite an existing zip file.

    Returns:
        Path to the downloaded zip file.

    Raises:
        ValueError: If the data type is not available as an archive.
    """
    data_def = SPPRawDataRegistry.get_definition(data_type)
    if not data_def.archive_zip_dir or not data_def.url_endpoint:
        supported_types = [
            t.value
            for t in SPPRawDataRegistry.list_available_types()
            if (
                SPPRawDataRegistry.get_definition(t).archive_zip_dir
                and SPPRawDataRegistry.get_definition(t).url_endpoint
            )
        ]
        raise ValueError(
            f"{data_type.value} is not available as a yearly archive. Supported types: {', '.join(supported_types)}"
        )

    url = f"{FILE_BROWSER_API_DOWNLOAD_URL}/{data_def.url_endpoint}?path=%2F{year}%2F{year}.zip"

    output_dir = ensure_output_dir(to_path(output_dir), "output_dir")
    output_file = output_dir / "spp" / data_def.archive_zip_dir / f"{year}.zip"
    output_file.parent.mkdir(parents=True, exist_ok=True)

    if output_file.exists() and not overwrite:
        logger.info(f"Skipping existing archive: {output_file}")
        return output_file

    with FileDownloader() as downloader:
        downloader.download(
            url=url,
            output_file=output_file,
            description=f"[{SPPRawDataRegistry.get_title(data_type)} {year}]",
        )

    return output_file
extract_archival_data
extract_archival_data(input_dir: str | Path, output_dir: str | Path, data_type: SPPRawDataType, overwrite: bool = False) -> list[Path]

Extract and filter archival zip files into the daily file structure.

Reads yearly zip archives from {input_dir}/spp/{zip_dir}/*.zip and writes individual daily CSV files to {output_dir}/{registry_dir_name}/ matching the same output naming as download_raw_data.

Args: input_dir: Base directory containing spp/{zip_dir}/*.zip archives. output_dir: Base directory for extracted daily CSV files. data_type: The raw data type to extract. overwrite: Whether to overwrite existing output files.

Returns: List of paths to written output files.

Raises: ValueError: If the data type is not available as an archive.

Source code in src/progridpy/iso/spp/client.py
def extract_archival_data(
    self,
    input_dir: str | Path,
    output_dir: str | Path,
    data_type: SPPRawDataType,
    overwrite: bool = False,
) -> list[Path]:
    """Extract and filter archival zip files into the daily file structure.

    Reads yearly zip archives from {input_dir}/spp/{zip_dir}/*.zip and writes
    individual daily CSV files to {output_dir}/{registry_dir_name}/ matching the
    same output naming as download_raw_data.

    Args:
        input_dir: Base directory containing spp/{zip_dir}/*.zip archives.
        output_dir: Base directory for extracted daily CSV files.
        data_type: The raw data type to extract.
        overwrite: Whether to overwrite existing output files.

    Returns:
        List of paths to written output files.

    Raises:
        ValueError: If the data type is not available as an archive.
    """
    data_def = SPPRawDataRegistry.get_definition(data_type)
    if not data_def.archive_zip_dir:
        supported_types = [
            t.value
            for t in SPPRawDataRegistry.list_available_types()
            if SPPRawDataRegistry.get_definition(t).archive_zip_dir
        ]
        raise ValueError(
            f"{data_type.value} is not available as a yearly archive. Supported types: {', '.join(supported_types)}"
        )

    input_dir = ensure_input_dir(to_path(input_dir), "input_dir")
    output_dir = ensure_output_dir(to_path(output_dir), "output_dir")

    zip_base = input_dir / "spp" / data_def.archive_zip_dir
    zip_files = sorted(zip_base.glob("*.zip"))
    if not zip_files:
        logger.warning(f"No zip files found under {zip_base}")
        return []

    out_base = output_dir / data_def.dir_name
    out_base.mkdir(parents=True, exist_ok=True)

    written_files: list[Path] = []

    for zip_path in zip_files:
        written_files.extend(self._extract_single_archive(zip_path, out_base, data_type, data_def, overwrite))

    return written_files
clear_and_calculate_gain
clear_and_calculate_gain(trade_df: DataFrame, processed_df: DataFrame, min_offer_price: float = -500, max_bid_price: float = 2000) -> DataFrame

Clear trades and calculate financial gains based on market prices.

da_lmp is the clearing price. Supply offers clear when da_lmp >= offer_price. Demand bids clear when da_lmp <= offer_price.

Source code in src/progridpy/iso/spp/client.py
def clear_and_calculate_gain(
    self,
    trade_df: pl.DataFrame,
    processed_df: pl.DataFrame,
    min_offer_price: float = -500,
    max_bid_price: float = 2000,
) -> pl.DataFrame:
    """Clear trades and calculate financial gains based on market prices.

    da_lmp is the clearing price.
    Supply offers clear when da_lmp >= offer_price.
    Demand bids clear when da_lmp <= offer_price.
    """
    merged = trade_df.join(processed_df, on=["interval_start_utc", "node"], how="inner", suffix="_proc")

    if "interval_start_local_proc" in merged.columns:
        merged = merged.drop("interval_start_local_proc")

    if "offer_price" not in merged.columns:
        merged = merged.with_columns(
            pl.when(pl.col("is_supply"))
            .then(pl.lit(min_offer_price))
            .otherwise(pl.lit(max_bid_price))
            .alias("offer_price")
        )

    # Determine clearing: da_lmp acts as clearing price
    supply_clears = pl.col("is_supply") & (pl.col("da_lmp") >= pl.col("offer_price"))
    demand_clears = (~pl.col("is_supply")) & (pl.col("da_lmp") <= pl.col("offer_price"))
    merged = merged.with_columns((supply_clears | demand_clears).alias("cleared"))

    # Gain only for cleared trades
    supply_gain = (pl.col("da_lmp") - pl.col("rt_lmp_final")) * pl.col("volume")
    demand_gain = (pl.col("rt_lmp_final") - pl.col("da_lmp")) * pl.col("volume")

    merged = merged.with_columns(
        pl.when(pl.col("cleared") & pl.col("is_supply"))
        .then(supply_gain)
        .when(pl.col("cleared") & (~pl.col("is_supply")))
        .then(demand_gain)
        .otherwise(pl.lit(0.0))
        .alias("gain"),
    )

    return merged.select(
        "interval_start_utc",
        "interval_start_local",
        "node",
        "is_supply",
        "da_lmp",
        "rt_lmp_final",
        "offer_price",
        "volume",
        "cleared",
        "gain",
    )
process_trade
process_trade(trade_dir: str | Path, processed_dir: str | Path, start_date: str | datetime | None = None, end_date: str | datetime | None = None) -> DataFrame

Process trade files and calculate gains based on DA/RT spread.

Source code in src/progridpy/iso/spp/client.py
def process_trade(
    self,
    trade_dir: str | Path,
    processed_dir: str | Path,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
) -> pl.DataFrame:
    """Process trade files and calculate gains based on DA/RT spread."""
    trade_path = to_path(trade_dir)
    processed_pat = to_path(processed_dir)
    if trade_path is None or processed_pat is None:
        raise ValueError("trade_dir and processed_dir must be provided.")
    if trade_path.is_file():
        ensure_input_file(trade_path, "trade_dir")
    else:
        ensure_input_dir(trade_path, "trade_dir")
    if processed_pat.is_file():
        ensure_input_file(processed_pat, "processed_dir")
    else:
        ensure_input_dir(processed_pat, "processed_dir")

    if start_date is not None:
        start_date = parse_datetime(start_date, tz=self.timezone)
    if end_date is not None:
        end_date = parse_datetime(end_date, tz=self.timezone)

    trade_files_by_date: dict[datetime, pl.DataFrame] = {}

    if trade_path.is_file():
        trade_files_by_date = self._parse_trade_file(trade_path, start_date, end_date)
    else:
        for file_path in sorted(trade_path.glob("SPP-*.csv")):
            parts = file_path.stem.split("-")
            date_str = parts[1]
            file_date = parse_datetime(date_str, tz=self.timezone)

            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue

            trade_files_by_date[file_date] = self._build_trade_df(pl.read_csv(file_path), date_str)

    # Load processed nodal parquet files
    processed_files_by_date: dict[datetime, pl.DataFrame] = {}

    if processed_pat.is_file():
        file_date = extract_date_from_hive_path(str(processed_pat))
        if file_date is None:
            file_date = parse_datetime(processed_pat.stem, tz=self.timezone)
        else:
            file_date = file_date.replace(tzinfo=self.timezone)
        if (start_date is None or file_date >= start_date) and (end_date is None or file_date <= end_date):
            processed_files_by_date[file_date] = pl.read_parquet(processed_pat)
    else:
        parquet_files = sorted(processed_pat.rglob("data.parquet"))
        nodal_files = [fp for fp in parquet_files if "dataset=nodal" in str(fp)]

        for file_path in nodal_files:
            file_date = extract_date_from_hive_path(str(file_path))
            if file_date is None:
                continue
            file_date = file_date.replace(tzinfo=self.timezone)
            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue
            processed_files_by_date[file_date] = pl.read_parquet(file_path)

    if not trade_files_by_date:
        raise ValueError(f"No trade files found in {trade_path} for the specified date range")
    if not processed_files_by_date:
        raise ValueError(f"No processed files found in {processed_pat} for the specified date range")

    all_results: list[pl.DataFrame] = []

    for date in sorted(trade_files_by_date.keys()):
        if date not in processed_files_by_date:
            logger.warning("No processed data found for %s, skipping", date.strftime("%Y%m%d"))
            continue
        result_df = self.clear_and_calculate_gain(trade_files_by_date[date], processed_files_by_date[date])
        all_results.append(result_df)

    if not all_results:
        raise ValueError("No matching dates found between trade and processed data")

    return pl.concat(all_results)

SPPProcessedDataType

Bases: StrEnum

Defines the granularity of the processed dataset.