Skip to content

MISO

Midcontinent Independent System Operator implementation. MISO uses a fixed-EST market-time model distinct from SPP's UTC-canonical approach.

Key Characteristics

  • Timezone model: MISO operates on fixed EST market time with a constant 24 trading hours per day. Processed datasets are keyed on interval_start_local only (no separate UTC column).
  • Raw data types: DA ExAnte/ExPost LMP, RT Preliminary/Final LMP, RT Rolling LMP (API-based), Dead Nodes, Load (zonal and regional), Wind/Solar forecasts, Cleared Bids, and Enverus renewable forecast variants.
  • Processed granularity: NODAL (per-CPNODE LMPs + broadcast load), SYSTEM (system-wide load, 24 rows/day), REGIONAL (wind/solar by MISO region).
  • Dead nodes: Node-hours reported as dead must have their P&L voided. Dead-node files are published at 10:20 AM EPT for OD-2.
  • Market window: The Day-Ahead market operates from 6:00 AM to 10:30 AM Eastern Prevailing Time, unchanged by daylight saving.

Processing Bindings

MISO uses four join modes, including a dead-node-aware variant:

Join Mode Semantics
NODE_LEFT Left join on (interval_start_local, node)
NODE_LEFT_DEAD Same as NODE_LEFT with an additional dead_node flag column
TIME_LEFT Left join on interval_start_local, broadcast to all nodes
TIME_FULL Full outer join on interval_start_local

Classes

MISO

MISO()

Bases: ISOBase[MISORawDataType, MISOProcessedDataType]

Source code in src/progridpy/iso/miso/client.py
def __init__(self) -> None:
    super().__init__(iso=ISO.MISO, home_url=HOME_URL, timezone="EST")
    self.raw_dir = Path("data/miso/raw")
    self.processed_dir = Path("data/miso/processed")
    self.enverus_client = EnverusClient()

Functions

clear_and_calculate_gain
clear_and_calculate_gain(trade_df: DataFrame, processed_df: DataFrame, min_offer_price: float = -500, max_bid_price: float = 2000) -> DataFrame

Clear trades and calculate financial gains based on market prices.

Supply offers clear when clearing_lmp >= offer_price. Demand bids clear when clearing_lmp <= offer_price. Dead nodes are excluded from clearing.

Source code in src/progridpy/iso/miso/client.py
def clear_and_calculate_gain(
    self,
    trade_df: pl.DataFrame,
    processed_df: pl.DataFrame,
    min_offer_price: float = -500,
    max_bid_price: float = 2000,
) -> pl.DataFrame:
    """Clear trades and calculate financial gains based on market prices.

    Supply offers clear when clearing_lmp >= offer_price.
    Demand bids clear when clearing_lmp <= offer_price.
    Dead nodes are excluded from clearing.
    """
    processed_df = processed_df.with_columns(pl.col("dead_node").cast(pl.Boolean, strict=False).fill_null(False))

    merged = trade_df.join(processed_df, on=["interval_start_local", "node"], how="inner")

    # Set default offer price if not present
    if "offer_price" not in merged.columns:
        merged = merged.with_columns(
            pl.when(pl.col("is_supply"))
            .then(pl.lit(min_offer_price))
            .otherwise(pl.lit(max_bid_price))
            .alias("offer_price")
        )

    active = ~pl.col("dead_node")

    # Determine clearing
    supply_clears = active & pl.col("is_supply") & (pl.col("clearing_lmp") >= pl.col("offer_price"))
    demand_clears = active & (~pl.col("is_supply")) & (pl.col("clearing_lmp") <= pl.col("offer_price"))
    merged = merged.with_columns((supply_clears | demand_clears).alias("cleared"))

    # RT price: prefer final, fall back to prelim
    merged = merged.with_columns(
        pl.coalesce(
            pl.col("rt_lmp_final").cast(pl.Float64, strict=False),
            pl.col("rt_lmp_prelim").cast(pl.Float64, strict=False),
        ).alias("rt_price")
    )

    # Calculate gain for cleared trades
    supply_gain = (pl.col("da_lmp") - pl.col("rt_price")) * pl.col("volume")
    demand_gain = (pl.col("rt_price") - pl.col("da_lmp")) * pl.col("volume")

    merged = merged.with_columns(
        pl.when(pl.col("cleared") & pl.col("is_supply"))
        .then(supply_gain)
        .when(pl.col("cleared") & (~pl.col("is_supply")))
        .then(demand_gain)
        .otherwise(pl.lit(0.0))
        .alias("gain")
    )

    return merged.select(
        "interval_start_local",
        "node",
        "dead_node",
        "is_supply",
        "clearing_lmp",
        "da_lmp",
        "rt_lmp_prelim",
        "rt_lmp_final",
        "offer_price",
        "volume",
        "cleared",
        "gain",
    )
process_trade
process_trade(trade_dir: str | Path, processed_dir: str | Path, start_date: str | datetime | None = None, end_date: str | datetime | None = None) -> DataFrame

Process trade files and calculate gains based on market clearing.

Source code in src/progridpy/iso/miso/client.py
def process_trade(
    self,
    trade_dir: str | Path,
    processed_dir: str | Path,
    start_date: str | datetime | None = None,
    end_date: str | datetime | None = None,
) -> pl.DataFrame:
    """Process trade files and calculate gains based on market clearing."""
    trade_dir = to_path(trade_dir)
    processed_dir = to_path(processed_dir)
    if trade_dir is None or processed_dir is None:
        raise ValueError("trade_dir and processed_dir must be provided.")
    if trade_dir.is_file():
        ensure_input_file(trade_dir, "trade_dir")
    else:
        ensure_input_dir(trade_dir, "trade_dir")
    if processed_dir.is_file():
        ensure_input_file(processed_dir, "processed_dir")
    else:
        ensure_input_dir(processed_dir, "processed_dir")

    if start_date is not None:
        start_date = parse_datetime(start_date, tz=self.timezone)
    if end_date is not None:
        end_date = parse_datetime(end_date, tz=self.timezone)

    trade_files_by_date: dict[datetime, pl.DataFrame] = {}

    if trade_dir.is_file():
        trade_files_by_date = self._parse_trade_file(trade_dir, start_date, end_date)
    else:
        for file_path in sorted(trade_dir.glob("MISO-*.csv")):
            parts = file_path.stem.split("-")
            date_str = parts[1]
            file_date = parse_datetime(date_str, tz=self.timezone)

            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue

            raw_df = pl.read_csv(file_path)
            raw_df = raw_df.rename({c: c.lower() for c in raw_df.columns})
            raw_df = raw_df.with_columns(pl.col("type").str.to_lowercase())

            if not raw_df["he"].is_between(1, 24).all():
                invalid = raw_df.filter(~pl.col("he").is_between(1, 24))["he"].unique().to_list()
                raise ValueError(f"HE values must be between 1 and 24, found: {invalid}")

            raw_df = raw_df.with_columns(
                (
                    pl.lit(date_str).str.strptime(pl.Date, "%Y%m%d").cast(pl.Datetime("us"))
                    + pl.duration(hours=pl.col("he").cast(pl.Int32) - 1)
                ).alias("interval_start_local"),
                (pl.col("type") == "offer").alias("is_supply"),
            ).rename({"mwh": "volume", "price": "offer_price"})

            trade_files_by_date[file_date] = raw_df.select(
                "interval_start_local", "node", "volume", "is_supply", "offer_price"
            )

    # Load processed files
    processed_files_by_date: dict[datetime, pl.DataFrame] = {}

    if processed_dir.is_file():
        file_date = extract_date_from_hive_path(str(processed_dir))
        if file_date is None:
            file_date = parse_datetime(processed_dir.stem, tz=self.timezone)
        else:
            file_date = file_date.replace(tzinfo=self.timezone)
        if (start_date is None or file_date >= start_date) and (end_date is None or file_date <= end_date):
            processed_files_by_date[file_date] = pl.read_parquet(processed_dir)
    else:
        parquet_files = sorted(processed_dir.rglob("data.parquet"))
        nodal_files = [file_path for file_path in parquet_files if "dataset=nodal" in str(file_path)]

        for file_path in nodal_files:
            file_date = extract_date_from_hive_path(str(file_path))
            if file_date is None:
                continue
            file_date = file_date.replace(tzinfo=self.timezone)
            if start_date and file_date < start_date:
                continue
            if end_date and file_date > end_date:
                continue
            processed_files_by_date[file_date] = pl.read_parquet(file_path)

    if not trade_files_by_date:
        raise ValueError(f"No trade files found in {trade_dir} for the specified date range")
    if not processed_files_by_date:
        raise ValueError(f"No processed files found in {processed_dir} for the specified date range")

    all_results: list[pl.DataFrame] = []

    for date in sorted(trade_files_by_date.keys()):
        if date not in processed_files_by_date:
            logger.warning(f"No processed data found for {date.strftime('%Y%m%d')}, skipping")
            continue
        result_df = self.clear_and_calculate_gain(trade_files_by_date[date], processed_files_by_date[date])
        all_results.append(result_df)

    if not all_results:
        raise ValueError("No matching dates found between trade and processed data")

    return pl.concat(all_results)

MISOProcessedDataRegistry

Bases: DataRegistry[MISOProcessedDataType, MISODataDefinition]

Registry for MISO processed data types and their definitions.

MISORawDataRegistry

Bases: DataRegistry[MISORawDataType, MISODataDefinition]

Registry for MISO raw data types and their definitions.