Skip to content

SPP

Southwest Power Pool implementation. SPP is the reference implementation for the registry-driven processing pattern -- all other ISOs follow its structural conventions.

Key Characteristics

  • Timezone model: Source files use GMT/UTC timestamps. interval_start_utc is computed first; interval_start_local is derived from that UTC instant. DST ambiguity is resolved by distinct UTC timestamps.
  • Raw data types: Day-Ahead LMP, Real-Time LMP (prelim, final, rolling), LMP Forecast, Load (actual/forecast), Resource (actual/forecast by reserve zone).
  • Processed granularity: NODAL (per-settlement-location), SYSTEM (system-wide load, 24 rows/day), ZONAL (resource by reserve zone, 24 x 5 rows/day).
  • Virtual trading: Offers and bids are placed on Settlement Locations (not PNODEs). Minimum volume is 0.001 MWh. Offer prices range from -$500 to \(2,000; bid prices from -\)9,999 to $2,000.

Processing Bindings

SPP uses five join modes to assemble processed datasets:

Join Mode Semantics
NODE_LEFT Left join on (interval_start_utc, interval_start_local, node)
TIME_LEFT Left join on time columns, broadcast to all nodes
TIME_SUM_LEFT Aggregate reserve-zone rows to time-level sums, then left join
TIME_FULL Full outer join on time (keeps timestamps from either side)
ZONE_FULL Full outer join on (interval_start_utc, interval_start_local, reserve_zone)

Classes

SPP

SPP()

Bases: ISOBase[SPPRawDataType, SPPProcessedDataType]

Source code in src/progridpy/iso/spp/client.py
def __init__(self) -> None:
    super().__init__(iso=ISO.SPP, home_url=MARKETPLACE_URL, timezone="America/Chicago")
    self.raw_dir = Path("data/spp/raw")
    self.processed_dir = Path("data/spp/processed")
    self._file_browser_folder_cache: dict[tuple[str, str], list[_FileBrowserEntry]] = {}

Functions

download_raw_data
download_raw_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPRawDataType | list[SPPRawDataType] | None = None, download_src: str | FileLocation = ISO, output_dir: str | Path | None = None, overwrite: bool = False, verbose: bool = False) -> list[Path]

Download raw data from either SPP website or S3 bucket. The start and end dates are inclusive.

Arguments: start_date (str | datetime | None): the start date to fetch reports for, defaults to today. String formats supported: YYYYMMDD, YYYY/MM/DD, YYYY-MM-DD end_date (str | datetime | None): the end date to fetch reports for, defaults to today. String formats supported: YYYYMMDD, YYYY/MM/DD, YYYY-MM-DD data_types (SPPRawDataType | list[SPPRawDataType]): the data types to fetch, defaults to all. download_src (str | FileLocation): the source to download from, defaults to S3. String values supported: "s3", "iso" output_dir (str | Path): the path to save the data to, defaults to data/spp/raw. overwrite (bool): whether to overwrite existing files, defaults to False. verbose (bool): whether to print verbose output, defaults to False. Returns: list[Path]: A list of paths to the downloaded files.

Source code in src/progridpy/iso/spp/client.py
def download_raw_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPRawDataType | list[SPPRawDataType] | None = None,
    download_src: str | FileLocation = FileLocation.ISO,
    output_dir: str | Path | None = None,
    overwrite: bool = False,
    verbose: bool = False,
) -> list[Path]:
    """
    Download raw data from either SPP website or S3 bucket. The start and end dates are inclusive.

    Arguments:
        start_date (str | datetime | None): the start date to fetch reports for, defaults to today.
            String formats supported: YYYYMMDD, YYYY/MM/DD, YYYY-MM-DD
        end_date (str | datetime | None): the end date to fetch reports for, defaults to today.
            String formats supported: YYYYMMDD, YYYY/MM/DD, YYYY-MM-DD
        data_types (SPPRawDataType | list[SPPRawDataType]): the data types to fetch, defaults to all.
        download_src (str | FileLocation): the source to download from, defaults to S3.
            String values supported: "s3", "iso"
        output_dir (str | Path): the path to save the data to, defaults to data/spp/raw.
        overwrite (bool): whether to overwrite existing files, defaults to False.
        verbose (bool): whether to print verbose output, defaults to False.
    Returns:
        list[Path]: A list of paths to the downloaded files.
    """
    start_date = parse_datetime(start_date, tz=self.timezone)
    end_date = parse_datetime(end_date, tz=self.timezone)

    # Convert string parameters to appropriate types
    if isinstance(download_src, str):
        download_src = parse_file_location(download_src)

    # Convert single data type to list to get consistent behavior
    if isinstance(data_types, SPPRawDataType):
        data_types = [data_types]

    # Set defaults
    if data_types is None:
        data_types = SPPRawDataType.get_all()

    output_dir = to_path(output_dir) or self.raw_dir
    output_dir = ensure_output_dir(output_dir, "output_dir")

    match download_src:
        case FileLocation.S3:
            return self._download_raw_data_from_s3(start_date, end_date, data_types, output_dir, overwrite, verbose)
        case FileLocation.ISO:
            return self._download_from_iso(start_date, end_date, data_types, output_dir, overwrite, verbose)
        case _:
            raise ValueError(f"Unsupported download location: {download_src}")
upload_raw_data
upload_raw_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPRawDataType | list[SPPRawDataType] | None = None, input_dir: str | Path | None = None, overwrite: bool = False, verbose: bool = False) -> list[str]

Upload raw data files to S3.

Scans input_dir/{dir_name}/ for files matching the date range, then uploads them with deterministic S3 keys.

Source code in src/progridpy/iso/spp/client.py
def upload_raw_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPRawDataType | list[SPPRawDataType] | None = None,
    input_dir: str | Path | None = None,
    overwrite: bool = False,
    verbose: bool = False,
) -> list[str]:
    """Upload raw data files to S3.

    Scans input_dir/{dir_name}/ for files matching the date range, then uploads them
    with deterministic S3 keys.
    """
    start_date = parse_datetime(start_date, tz=self.timezone)
    end_date = parse_datetime(end_date, tz=self.timezone)

    if isinstance(data_types, SPPRawDataType):
        data_types = [data_types]
    if data_types is None:
        data_types = SPPRawDataType.get_all()

    input_dir = ensure_input_dir(to_path(input_dir) or self.raw_dir, "input_dir")

    console = Console(record=True)
    console.print(
        "🚀 [bold]Uploading SPP Raw Data to S3[/bold]: ",
        f"[bold cyan]📅{start_date.strftime('%Y%m%d')}[/bold cyan] - "
        f"[bold cyan]📅{end_date.strftime('%Y%m%d')}[/bold cyan]",
    )

    all_uploaded: list[str] = []
    stats_by_data_type: dict[SPPRawDataType, list[int]] = {}

    with S3Handler(verbose=verbose) as handler:
        for data_type in data_types:
            data_def = SPPRawDataRegistry.get_definition(data_type)
            ext = data_def.file_extension
            type_dir = input_dir / data_def.dir_name

            if not type_dir.exists():
                stats_by_data_type[data_type] = [0, 0, 0]
                continue

            # Scan local files and filter by date range
            refs: list[S3ObjectRef] = []
            for local_file in sorted(type_dir.glob(f"*.{ext}")):
                try:
                    date_str = local_file.stem.split("_")[0]
                    file_date = datetime.strptime(date_str, "%Y%m%d").replace(tzinfo=self.timezone)
                except (ValueError, IndexError):
                    continue
                if not (start_date <= file_date <= end_date):
                    continue
                key = self._raw_s3_key(data_def.dir_name, local_file.name)
                refs.append(S3ObjectRef(key=key, local_path=local_file))

            uploaded, skipped, failed = handler.upload_objects(
                S3_BUCKETS.raw,
                refs,
                overwrite=overwrite,
                description=f"[{data_def.title}]",
            )
            all_uploaded.extend(uploaded)
            stats_by_data_type[data_type] = [len(uploaded), len(skipped), len(failed)]

    display_data_transfer_summary(
        stats_by_data_type, SPPRawDataRegistry, input_dir, transfer_type="upload", console=console
    )
    return all_uploaded
download_processed_data
download_processed_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None, output_dir: str | Path | None = None, overwrite: bool = False, verbose: bool = False) -> list[Path]

Download processed data from S3 using deterministic hive-partitioned keys.

Source code in src/progridpy/iso/spp/client.py
def download_processed_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None,
    output_dir: str | Path | None = None,
    overwrite: bool = False,
    verbose: bool = False,
) -> list[Path]:
    """Download processed data from S3 using deterministic hive-partitioned keys."""
    start_date = parse_datetime(start_date, tz=self.timezone)
    end_date = parse_datetime(end_date, tz=self.timezone)

    if isinstance(data_types, SPPProcessedDataType):
        data_types = [data_types]
    if data_types is None:
        data_types = SPPProcessedDataType.get_all()

    output_dir = ensure_output_dir(to_path(output_dir) or self.processed_dir, "output_dir")

    console = Console(record=True)
    console.print(
        "🚀 [bold]Downloading SPP Processed Data from S3[/bold]: ",
        f"[bold cyan]📅{start_date.strftime('%Y%m%d')}[/bold cyan] - "
        f"[bold cyan]📅{end_date.strftime('%Y%m%d')}[/bold cyan]",
    )

    all_downloaded: list[Path] = []
    stats_by_data_type: dict[SPPProcessedDataType, list[int]] = {}

    with S3Handler(verbose=verbose) as handler:
        for data_type in data_types:
            refs = self._build_processed_s3_refs(data_type, start_date, end_date, output_dir)
            downloaded, skipped, failed = handler.download_objects(
                S3_BUCKETS.processed,
                refs,
                overwrite=overwrite,
                description=f"[{SPPProcessedDataRegistry.get_title(data_type)}]",
            )
            all_downloaded.extend(downloaded)
            stats_by_data_type[data_type] = [len(downloaded), len(skipped), len(failed)]

    display_data_transfer_summary(
        stats_by_data_type, SPPProcessedDataRegistry, output_dir, transfer_type="download", console=console
    )
    console.print(f"💾 Files saved to: [bold cyan]{output_dir.absolute()}[/bold cyan]")
    return all_downloaded
upload_processed_data
upload_processed_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None, input_dir: str | Path | None = None, overwrite: bool = False, verbose: bool = False) -> list[str]

Upload processed parquet files to S3.

Scans input_dir for hive-partitioned data.parquet files, filters by date range, then uploads with deterministic S3 keys.

Source code in src/progridpy/iso/spp/client.py
def upload_processed_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None,
    input_dir: str | Path | None = None,
    overwrite: bool = False,
    verbose: bool = False,
) -> list[str]:
    """Upload processed parquet files to S3.

    Scans input_dir for hive-partitioned data.parquet files, filters by date range,
    then uploads with deterministic S3 keys.
    """
    start_date = parse_datetime(start_date, tz=self.timezone)
    end_date = parse_datetime(end_date, tz=self.timezone)

    if isinstance(data_types, SPPProcessedDataType):
        data_types = [data_types]
    if data_types is None:
        data_types = SPPProcessedDataType.get_all()

    input_dir = ensure_input_dir(to_path(input_dir) or self.processed_dir, "input_dir")

    console = Console(record=True)
    console.print(
        "🚀 [bold]Uploading SPP Processed Data to S3[/bold]: ",
        f"[bold cyan]📅{start_date.strftime('%Y%m%d')}[/bold cyan] - "
        f"[bold cyan]📅{end_date.strftime('%Y%m%d')}[/bold cyan]",
    )

    all_uploaded: list[str] = []
    stats_by_data_type: dict[SPPProcessedDataType, list[int]] = {}

    with S3Handler(verbose=verbose) as handler:
        for data_type in data_types:
            data_def = SPPProcessedDataRegistry.get_definition(data_type)
            type_dir = input_dir / data_def.dir_name

            if not type_dir.exists():
                stats_by_data_type[data_type] = [0, 0, 0]
                continue

            # Scan for data.parquet files in hive structure
            refs: list[S3ObjectRef] = []
            for parquet_file in sorted(type_dir.rglob("data.parquet")):
                rel_path = str(parquet_file.relative_to(input_dir))
                file_date = extract_date_from_hive_path(rel_path)
                if file_date is None:
                    continue
                file_date = file_date.replace(tzinfo=self.timezone)
                if not (start_date <= file_date <= end_date):
                    continue
                key = self._processed_s3_key(data_def.dir_name, file_date)
                refs.append(S3ObjectRef(key=key, local_path=parquet_file))

            uploaded, skipped, failed = handler.upload_objects(
                S3_BUCKETS.processed,
                refs,
                overwrite=overwrite,
                description=f"[{data_def.title}]",
            )
            all_uploaded.extend(uploaded)
            stats_by_data_type[data_type] = [len(uploaded), len(skipped), len(failed)]

    display_data_transfer_summary(
        stats_by_data_type, SPPProcessedDataRegistry, input_dir, transfer_type="upload", console=console
    )
    return all_uploaded
process_raw_data
process_raw_data(start_date: str | datetime | None = None, end_date: str | datetime | None = None, data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None, input_dir: str | Path | None = None, output_dir: str | Path | None = None, file_format: Literal['parquet', 'csv'] = 'parquet', overwrite: bool = False, verbose: bool = False) -> list[Path]

Process the downloaded raw data into a standardized format.

Args: start_date: Start date to filter data (if None, process from oldest available) end_date: End date to filter data (if None, process to latest available) data_types: Data type(s) to process (defaults to all types) input_dir: Directory containing raw data files (defaults to self.raw_dir) output_dir: Directory to save processed files (defaults to self.processed_dir) file_format: Output file format, either "parquet" or "csv" (defaults to "parquet") overwrite: Whether to overwrite existing processed files verbose: Whether to print verbose output

Returns: list[Path]: List of paths to successfully processed files

Source code in src/progridpy/iso/spp/client.py
def process_raw_data(
    self,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
    data_types: SPPProcessedDataType | list[SPPProcessedDataType] | None = None,
    input_dir: str | Path | None = None,
    output_dir: str | Path | None = None,
    file_format: Literal["parquet", "csv"] = "parquet",
    overwrite: bool = False,
    verbose: bool = False,
) -> list[Path]:
    """Process the downloaded raw data into a standardized format.

    Args:
        start_date: Start date to filter data (if None, process from oldest available)
        end_date: End date to filter data (if None, process to latest available)
        data_types: Data type(s) to process (defaults to all types)
        input_dir: Directory containing raw data files (defaults to self.raw_dir)
        output_dir: Directory to save processed files (defaults to self.processed_dir)
        file_format: Output file format, either "parquet" or "csv" (defaults to "parquet")
        overwrite: Whether to overwrite existing processed files
        verbose: Whether to print verbose output

    Returns:
        list[Path]: List of paths to successfully processed files
    """
    # Parse and validate parameters
    start_date = parse_datetime(start_date, tz=self.timezone) if start_date else None
    end_date = parse_datetime(end_date, tz=self.timezone) if end_date else None

    # Convert single data type to list
    if isinstance(data_types, SPPProcessedDataType):
        data_types = [data_types]

    # Set defaults
    if data_types is None:
        data_types = SPPProcessedDataType.get_all()
    input_dir = to_path(input_dir) or self.raw_dir
    output_dir = to_path(output_dir) or self.processed_dir
    input_dir = ensure_input_dir(input_dir, "input_dir")
    output_dir = ensure_output_dir(output_dir, "output_dir")

    self._validate_processing_registry()

    # Process each data type using match/case dispatcher
    all_processed_files = []
    stats_by_data_type = defaultdict(lambda: [0, 0, 0])  # [processed, skipped, failed]
    console = Console(record=True)

    for data_type in data_types:
        match data_type:
            case SPPProcessedDataType.NODAL:
                processed_files, stats = self._process_nodal_files(
                    start_date=start_date,
                    end_date=end_date,
                    input_dir=input_dir,
                    output_dir=output_dir,
                    overwrite=overwrite,
                    verbose=verbose,
                    file_format=file_format,
                )
            case SPPProcessedDataType.SYSTEM:
                processed_files, stats = self._process_system_files(
                    start_date=start_date,
                    end_date=end_date,
                    input_dir=input_dir,
                    output_dir=output_dir,
                    overwrite=overwrite,
                    verbose=verbose,
                    file_format=file_format,
                )
            case SPPProcessedDataType.ZONAL:
                processed_files, stats = self._process_zonal_files(
                    start_date=start_date,
                    end_date=end_date,
                    input_dir=input_dir,
                    output_dir=output_dir,
                    overwrite=overwrite,
                    verbose=verbose,
                    file_format=file_format,
                )
            case _:
                raise ValueError(f"Unsupported processed data type: {data_type}")

        # Collect results
        all_processed_files.extend(processed_files)
        stats_by_data_type[data_type] = stats

    # Display summary table
    display_data_transfer_summary(
        stats_by_data_type, SPPProcessedDataRegistry, output_dir, transfer_type="process", console=console
    )
    console.print(f"💾 Files saved to: [bold cyan]{output_dir.absolute()}[/bold cyan]")

    return all_processed_files
download_rt_lmp_rolling
download_rt_lmp_rolling(output_dir: str | Path | None = None, overwrite: bool = False) -> Path | None

Download today's real-time rolling LMP data by merging 5-minute interval files.

Source code in src/progridpy/iso/spp/client.py
def download_rt_lmp_rolling(
    self,
    output_dir: str | Path | None = None,
    overwrite: bool = False,
) -> Path | None:
    """Download today's real-time rolling LMP data by merging 5-minute interval files."""
    base_output_dir = to_path(output_dir) or self.raw_dir
    base_output_dir = ensure_output_dir(base_output_dir, "output_dir")
    output_dir = base_output_dir / SPPRawDataRegistry.get_dir_name(SPPRawDataType.REAL_TIME_ROLLING_LMP)
    output_dir = ensure_output_dir(output_dir, "output_dir")

    date = parse_datetime(None, tz=self.timezone)

    suffix = SPPRawDataRegistry.get_file_suffix(SPPRawDataType.REAL_TIME_ROLLING_LMP)
    extension = SPPRawDataRegistry.get_file_extension(SPPRawDataType.REAL_TIME_ROLLING_LMP)
    output_file = output_dir / f"{date.strftime('%Y%m%d')}_{suffix}.{extension}"

    if output_file.exists() and not overwrite:
        return None

    urls = self._construct_url(
        SPPRawDataType.REAL_TIME_PRELIM_LMP,
        date=date,
        resolve_latest_rc=True,
    )
    if not urls:
        logger.warning(f"No RT prelim intervals found for {date.strftime('%Y%m%d')}")
        return None

    with tempfile.TemporaryDirectory() as tmpdirname:
        temp_dir = Path(tmpdirname)
        output_files = [temp_dir / url.split("=")[-1].split("%2F")[-1] for url in urls]

        with FileDownloader(max_connections=32) as downloader:
            file_paths, _, _ = downloader.download_batch(
                urls=urls,
                output_files=output_files,
                max_concurrent_files=32,
                description=f"[{SPPRawDataRegistry.get_title(SPPRawDataType.REAL_TIME_ROLLING_LMP)}]",
                fetch_metadata=False,
            )

        interval_files = [p for p in file_paths if p.exists()]

        if not interval_files:
            logger.warning(f"No interval files downloaded for {date.strftime('%Y%m%d')}")
            return None

        try:
            merged_df = self._merge_rt_lmp_intervals(interval_files)
            if merged_df is not None:
                merged_df = self._canonicalize_raw_df(
                    merged_df,
                    SPPRawDataType.REAL_TIME_ROLLING_LMP,
                    file_name=output_file.name,
                    stage="download",
                )
                data_def = SPPRawDataRegistry.get_definition(SPPRawDataType.REAL_TIME_ROLLING_LMP)
                expected_cols = list(data_def.raw_expected_columns)
                available_cols = [col for col in expected_cols if col in merged_df.columns]
                merged_df.select(available_cols).write_csv(output_file)
                return output_file
            return None
        except Exception as e:
            logger.error(f"Error merging RT rolling LMP for {date.strftime('%Y%m%d')}: {e!s}")
            return None
download_archival_data
download_archival_data(year: int, data_type: SPPRawDataType, output_dir: str | Path, overwrite: bool = False) -> Path

Download a yearly zip archive from SPP's file-browser API.

Args: year: The year to download (e.g. 2023). data_type: The raw data type to download the archive for. output_dir: Base directory where the zip will be saved under spp/{zip_dir}/{year}.zip. overwrite: Whether to overwrite an existing zip file.

Returns: Path to the downloaded zip file.

Raises: ValueError: If the data type is not available as an archive.

Source code in src/progridpy/iso/spp/client.py
def download_archival_data(
    self,
    year: int,
    data_type: SPPRawDataType,
    output_dir: str | Path,
    overwrite: bool = False,
) -> Path:
    """Download a yearly zip archive from SPP's file-browser API.

    Args:
        year: The year to download (e.g. 2023).
        data_type: The raw data type to download the archive for.
        output_dir: Base directory where the zip will be saved under spp/{zip_dir}/{year}.zip.
        overwrite: Whether to overwrite an existing zip file.

    Returns:
        Path to the downloaded zip file.

    Raises:
        ValueError: If the data type is not available as an archive.
    """
    data_def = SPPRawDataRegistry.get_definition(data_type)
    if not data_def.archive_zip_dir or not data_def.url_endpoint:
        supported_types = [
            t.value
            for t in SPPRawDataRegistry.list_available_types()
            if (
                SPPRawDataRegistry.get_definition(t).archive_zip_dir
                and SPPRawDataRegistry.get_definition(t).url_endpoint
            )
        ]
        raise ValueError(
            f"{data_type.value} is not available as a yearly archive. Supported types: {', '.join(supported_types)}"
        )

    url = f"{FILE_BROWSER_API_DOWNLOAD_URL}/{data_def.url_endpoint}?path=%2F{year}%2F{year}.zip"

    output_dir = ensure_output_dir(to_path(output_dir), "output_dir")
    output_file = output_dir / "spp" / data_def.archive_zip_dir / f"{year}.zip"
    output_file.parent.mkdir(parents=True, exist_ok=True)

    if output_file.exists() and not overwrite:
        logger.info(f"Skipping existing archive: {output_file}")
        return output_file

    with FileDownloader() as downloader:
        downloader.download(
            url=url,
            output_file=output_file,
            description=f"[{SPPRawDataRegistry.get_title(data_type)} {year}]",
        )

    return output_file
extract_archival_data
extract_archival_data(input_dir: str | Path, output_dir: str | Path, data_type: SPPRawDataType, overwrite: bool = False) -> list[Path]

Extract and filter archival zip files into the daily file structure.

Reads yearly zip archives from {input_dir}/spp/{zip_dir}/*.zip and writes individual daily CSV files to {output_dir}/{registry_dir_name}/ matching the same output naming as download_raw_data.

Args: input_dir: Base directory containing spp/{zip_dir}/*.zip archives. output_dir: Base directory for extracted daily CSV files. data_type: The raw data type to extract. overwrite: Whether to overwrite existing output files.

Returns: List of paths to written output files.

Raises: ValueError: If the data type is not available as an archive.

Source code in src/progridpy/iso/spp/client.py
def extract_archival_data(
    self,
    input_dir: str | Path,
    output_dir: str | Path,
    data_type: SPPRawDataType,
    overwrite: bool = False,
) -> list[Path]:
    """Extract and filter archival zip files into the daily file structure.

    Reads yearly zip archives from {input_dir}/spp/{zip_dir}/*.zip and writes
    individual daily CSV files to {output_dir}/{registry_dir_name}/ matching the
    same output naming as download_raw_data.

    Args:
        input_dir: Base directory containing spp/{zip_dir}/*.zip archives.
        output_dir: Base directory for extracted daily CSV files.
        data_type: The raw data type to extract.
        overwrite: Whether to overwrite existing output files.

    Returns:
        List of paths to written output files.

    Raises:
        ValueError: If the data type is not available as an archive.
    """
    data_def = SPPRawDataRegistry.get_definition(data_type)
    if not data_def.archive_zip_dir:
        supported_types = [
            t.value
            for t in SPPRawDataRegistry.list_available_types()
            if SPPRawDataRegistry.get_definition(t).archive_zip_dir
        ]
        raise ValueError(
            f"{data_type.value} is not available as a yearly archive. Supported types: {', '.join(supported_types)}"
        )

    input_dir = ensure_input_dir(to_path(input_dir), "input_dir")
    output_dir = ensure_output_dir(to_path(output_dir), "output_dir")

    zip_base = input_dir / "spp" / data_def.archive_zip_dir
    zip_files = sorted(zip_base.glob("*.zip"))
    if not zip_files:
        logger.warning(f"No zip files found under {zip_base}")
        return []

    out_base = output_dir / data_def.dir_name
    out_base.mkdir(parents=True, exist_ok=True)

    written_files: list[Path] = []

    for zip_path in zip_files:
        written_files.extend(self._extract_single_archive(zip_path, out_base, data_type, data_def, overwrite))

    return written_files
clear_and_calculate_gain
clear_and_calculate_gain(trade_df: DataFrame, processed_df: DataFrame, min_offer_price: float = -500, max_bid_price: float = 2000) -> DataFrame

Clear trades and calculate financial gains based on market prices.

da_lmp is the clearing price. Supply offers clear when da_lmp >= offer_price. Demand bids clear when da_lmp <= offer_price.

Source code in src/progridpy/iso/spp/client.py
def clear_and_calculate_gain(
    self,
    trade_df: pl.DataFrame,
    processed_df: pl.DataFrame,
    min_offer_price: float = -500,
    max_bid_price: float = 2000,
) -> pl.DataFrame:
    """Clear trades and calculate financial gains based on market prices.

    da_lmp is the clearing price.
    Supply offers clear when da_lmp >= offer_price.
    Demand bids clear when da_lmp <= offer_price.
    """
    merged = trade_df.join(processed_df, on=["interval_start_utc", "node"], how="inner", suffix="_proc")

    if "interval_start_local_proc" in merged.columns:
        merged = merged.drop("interval_start_local_proc")

    if "offer_price" not in merged.columns:
        merged = merged.with_columns(
            pl.when(pl.col("is_supply"))
            .then(pl.lit(min_offer_price))
            .otherwise(pl.lit(max_bid_price))
            .alias("offer_price")
        )

    # Determine clearing: da_lmp acts as clearing price
    supply_clears = pl.col("is_supply") & (pl.col("da_lmp") >= pl.col("offer_price"))
    demand_clears = (~pl.col("is_supply")) & (pl.col("da_lmp") <= pl.col("offer_price"))
    merged = merged.with_columns((supply_clears | demand_clears).alias("cleared"))

    # Gain only for cleared trades
    supply_gain = (pl.col("da_lmp") - pl.col("rt_lmp_final")) * pl.col("volume")
    demand_gain = (pl.col("rt_lmp_final") - pl.col("da_lmp")) * pl.col("volume")

    merged = merged.with_columns(
        pl.when(pl.col("cleared") & pl.col("is_supply"))
        .then(supply_gain)
        .when(pl.col("cleared") & (~pl.col("is_supply")))
        .then(demand_gain)
        .otherwise(pl.lit(0.0))
        .alias("gain"),
    )

    return merged.select(
        "interval_start_utc",
        "interval_start_local",
        "node",
        "is_supply",
        "da_lmp",
        "rt_lmp_final",
        "offer_price",
        "volume",
        "cleared",
        "gain",
    )
process_trade
process_trade(trade_dir: str | Path, processed_dir: str | Path, start_date: str | datetime | None = None, end_date: str | datetime | None = None) -> DataFrame

Process trade files and calculate gains based on DA/RT spread.

Source code in src/progridpy/iso/spp/client.py
def process_trade(
    self,
    trade_dir: str | Path,
    processed_dir: str | Path,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
) -> pl.DataFrame:
    """Process trade files and calculate gains based on DA/RT spread."""
    trade_path = to_path(trade_dir)
    processed_pat = to_path(processed_dir)
    if trade_path is None or processed_pat is None:
        raise ValueError("trade_dir and processed_dir must be provided.")
    if trade_path.is_file():
        ensure_input_file(trade_path, "trade_dir")
    else:
        ensure_input_dir(trade_path, "trade_dir")
    if processed_pat.is_file():
        ensure_input_file(processed_pat, "processed_dir")
    else:
        ensure_input_dir(processed_pat, "processed_dir")

    if start_date is not None:
        start_date = parse_datetime(start_date, tz=self.timezone)
    if end_date is not None:
        end_date = parse_datetime(end_date, tz=self.timezone)

    trade_files_by_date: dict[datetime, pl.DataFrame] = {}

    if trade_path.is_file():
        trade_files_by_date = self._parse_trade_file(trade_path, start_date, end_date)
    else:
        for file_path in sorted(trade_path.glob("SPP-*.csv")):
            parts = file_path.stem.split("-")
            date_str = parts[1]
            file_date = parse_datetime(date_str, tz=self.timezone)

            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue

            trade_files_by_date[file_date] = self._build_trade_df(pl.read_csv(file_path), date_str)

    # Load processed nodal parquet files
    processed_files_by_date: dict[datetime, pl.DataFrame] = {}

    if processed_pat.is_file():
        file_date = extract_date_from_hive_path(str(processed_pat))
        if file_date is None:
            file_date = parse_datetime(processed_pat.stem, tz=self.timezone)
        else:
            file_date = file_date.replace(tzinfo=self.timezone)
        if (start_date is None or file_date >= start_date) and (end_date is None or file_date <= end_date):
            processed_files_by_date[file_date] = pl.read_parquet(processed_pat)
    else:
        parquet_files = sorted(processed_pat.rglob("data.parquet"))
        nodal_files = [fp for fp in parquet_files if "dataset=nodal" in str(fp)]

        for file_path in nodal_files:
            file_date = extract_date_from_hive_path(str(file_path))
            if file_date is None:
                continue
            file_date = file_date.replace(tzinfo=self.timezone)
            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue
            processed_files_by_date[file_date] = pl.read_parquet(file_path)

    if not trade_files_by_date:
        raise ValueError(f"No trade files found in {trade_path} for the specified date range")
    if not processed_files_by_date:
        raise ValueError(f"No processed files found in {processed_pat} for the specified date range")

    all_results: list[pl.DataFrame] = []

    for date in sorted(trade_files_by_date.keys()):
        if date not in processed_files_by_date:
            logger.warning("No processed data found for %s, skipping", date.strftime("%Y%m%d"))
            continue
        result_df = self.clear_and_calculate_gain(trade_files_by_date[date], processed_files_by_date[date])
        all_results.append(result_df)

    if not all_results:
        raise ValueError("No matching dates found between trade and processed data")

    return pl.concat(all_results)

SPPRawDataRegistry

Bases: DataRegistry[SPPRawDataType, SPPDataDefinition]

Registry for SPP raw data types and their definitions.

SPPProcessedDataType

Bases: StrEnum

Defines the granularity of the processed dataset.