Skip to content

ERCOT

Electric Reliability Council of Texas implementation. ERCOT's source files are local-time-based and use a DSTFlag to disambiguate repeated hours during daylight saving transitions.

Key Characteristics

  • Timezone model: Source files provide local delivery date/hour with a DSTFlag. The client resolves the local instant, converts to interval_start_utc, then derives interval_start_local from UTC. The final local timestamp preserves the intended wall-clock hour while the UTC key makes repeated hours unambiguous.
  • Raw data types: DA Settlement Point Prices (hourly), RT Settlement Point Prices (15-min), Ancillary Service Prices, Wind/Solar production (hourly), Load forecast/actual by weather zone.
  • Processed granularity: NODAL, SYSTEM, WEATHER_ZONE, LOAD_ZONE.
  • Settlement point types: ERCOT defines multiple settlement point categories via the SettlementPointType enum -- hubs (HU, SH, AH), load zones (LZ, LZEW, LZ_DC, LZ_DCEW), and resource nodes (RN, LCCRN, PCCRN, PUN).

Processing Bindings

ERCOT uses five join modes:

Join Mode Semantics
NODE_LEFT Left join on node-level keys
TIME_LEFT Left join on time columns, broadcast to all nodes
TIME_FULL Full outer join on time
WEATHER_ZONE_FULL Full outer join including the weather zone dimension
LOAD_ZONE_FULL Full outer join including the load zone dimension

Classes

ERCOT

ERCOT(verbose: bool = False)

Bases: ISOBase[ERCOTRawDataType, ERCOTProcessedDataType]

Source code in src/progridpy/iso/ercot/client.py
def __init__(self, verbose: bool = False) -> None:
    super().__init__(iso=ISO.ERCOT, home_url="https://www.ercot.com/", timezone="America/Chicago")
    self.verbose = verbose
    self.raw_dir = Path("data/ercot/raw")
    self.processed_dir = Path("data/ercot/processed")
    self._ercot_api: ErcotAPI | None = None

Functions

clear_and_calculate_gain
clear_and_calculate_gain(trade_df: DataFrame, processed_df: DataFrame, min_offer_price: float = -500, max_bid_price: float = 2000) -> DataFrame

Clear trades and calculate financial gains based on market prices.

da_spp is the clearing price. No separate clearing column needed. Supply offers clear when da_spp >= offer_price. Demand bids clear when da_spp <= offer_price.

Source code in src/progridpy/iso/ercot/client.py
def clear_and_calculate_gain(
    self,
    trade_df: pl.DataFrame,
    processed_df: pl.DataFrame,
    min_offer_price: float = -500,
    max_bid_price: float = 2000,
) -> pl.DataFrame:
    """Clear trades and calculate financial gains based on market prices.

    da_spp is the clearing price. No separate clearing column needed.
    Supply offers clear when da_spp >= offer_price.
    Demand bids clear when da_spp <= offer_price.
    """
    merged = trade_df.join(processed_df, on=["interval_start_utc", "node"], how="inner", suffix="_proc")

    if "interval_start_local_proc" in merged.columns:
        merged = merged.drop("interval_start_local_proc")

    # Default offer price when not present
    if "offer_price" not in merged.columns:
        merged = merged.with_columns(
            pl.when(pl.col("is_supply"))
            .then(pl.lit(min_offer_price))
            .otherwise(pl.lit(max_bid_price))
            .alias("offer_price")
        )

    # Determine clearing: da_spp acts as clearing price
    supply_clears = pl.col("is_supply") & (pl.col("da_spp") >= pl.col("offer_price"))
    demand_clears = (~pl.col("is_supply")) & (pl.col("da_spp") <= pl.col("offer_price"))
    merged = merged.with_columns((supply_clears | demand_clears).alias("cleared"))

    # Gain only for cleared trades
    supply_gain = (pl.col("da_spp") - pl.col("rt_spp")) * pl.col("volume")
    demand_gain = (pl.col("rt_spp") - pl.col("da_spp")) * pl.col("volume")

    merged = merged.with_columns(
        pl.when(pl.col("cleared") & pl.col("is_supply"))
        .then(supply_gain)
        .when(pl.col("cleared") & (~pl.col("is_supply")))
        .then(demand_gain)
        .otherwise(pl.lit(0.0))
        .alias("gain"),
    )

    return merged.select(
        "interval_start_utc",
        "interval_start_local",
        "node",
        "is_supply",
        "da_spp",
        "rt_spp",
        "offer_price",
        "volume",
        "cleared",
        "gain",
    )
process_trade
process_trade(trade_dir: str | Path, processed_dir: str | Path, start_date: str | datetime | None = None, end_date: str | datetime | None = None) -> DataFrame

Process trade files and calculate gains based on DA/RT spread.

Source code in src/progridpy/iso/ercot/client.py
def process_trade(
    self,
    trade_dir: str | Path,
    processed_dir: str | Path,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
) -> pl.DataFrame:
    """Process trade files and calculate gains based on DA/RT spread."""
    trade_path = to_path(trade_dir)
    processed_pat = to_path(processed_dir)
    if trade_path is None or processed_pat is None:
        raise ValueError("trade_dir and processed_dir must be provided.")
    if trade_path.is_file():
        ensure_input_file(trade_path, "trade_dir")
    else:
        ensure_input_dir(trade_path, "trade_dir")
    if processed_pat.is_file():
        ensure_input_file(processed_pat, "processed_dir")
    else:
        ensure_input_dir(processed_pat, "processed_dir")

    if start_date is not None:
        start_date = parse_datetime(start_date, tz=self.timezone)
    if end_date is not None:
        end_date = parse_datetime(end_date, tz=self.timezone)

    trade_files_by_date: dict[datetime, pl.DataFrame] = {}

    if trade_path.is_file():
        trade_files_by_date = self._parse_trade_file(trade_path, start_date, end_date)
    else:
        for file_path in sorted(trade_path.glob("ERCOT-*.csv")):
            parts = file_path.stem.split("-")
            date_str = parts[1]
            file_date = parse_datetime(date_str, tz=self.timezone)

            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue

            trade_files_by_date[file_date] = self._build_trade_df(pl.read_csv(file_path), date_str)

    # Load processed nodal parquet files
    processed_files_by_date: dict[datetime, pl.DataFrame] = {}

    if processed_pat.is_file():
        file_date = extract_date_from_hive_path(str(processed_pat))
        if file_date is None:
            file_date = parse_datetime(processed_pat.stem, tz=self.timezone)
        else:
            file_date = file_date.replace(tzinfo=self.timezone)
        if (start_date is None or file_date >= start_date) and (end_date is None or file_date <= end_date):
            processed_files_by_date[file_date] = pl.read_parquet(processed_pat)
    else:
        parquet_files = sorted(processed_pat.rglob("data.parquet"))
        nodal_files = [fp for fp in parquet_files if "dataset=nodal" in str(fp)]

        for file_path in nodal_files:
            file_date = extract_date_from_hive_path(str(file_path))
            if file_date is None:
                continue
            file_date = file_date.replace(tzinfo=self.timezone)
            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue
            processed_files_by_date[file_date] = pl.read_parquet(file_path)

    if not trade_files_by_date:
        raise ValueError(f"No trade files found in {trade_path} for the specified date range")
    if not processed_files_by_date:
        raise ValueError(f"No processed files found in {processed_pat} for the specified date range")

    all_results: list[pl.DataFrame] = []

    for date in sorted(trade_files_by_date.keys()):
        if date not in processed_files_by_date:
            logger.warning("No processed data found for %s, skipping", date.strftime("%Y%m%d"))
            continue
        result_df = self.clear_and_calculate_gain(trade_files_by_date[date], processed_files_by_date[date])
        all_results.append(result_df)

    if not all_results:
        raise ValueError("No matching dates found between trade and processed data")

    return pl.concat(all_results)