ERCOT
Electric Reliability Council of Texas implementation. ERCOT's source files are local-time-based and use a DSTFlag to disambiguate repeated hours during daylight saving transitions.
Key Characteristics
- Timezone model: Source files provide local delivery date/hour with a
DSTFlag. The client resolves the local instant, converts to interval_start_utc, then derives interval_start_local from UTC. The final local timestamp preserves the intended wall-clock hour while the UTC key makes repeated hours unambiguous.
- Raw data types: DA Settlement Point Prices (hourly), RT Settlement Point Prices (15-min), Ancillary Service Prices, Wind/Solar production (hourly), Load forecast/actual by weather zone.
- Processed granularity:
NODAL, SYSTEM, WEATHER_ZONE, LOAD_ZONE.
- Settlement point types: ERCOT defines multiple settlement point categories via the
SettlementPointType enum -- hubs (HU, SH, AH), load zones (LZ, LZEW, LZ_DC, LZ_DCEW), and resource nodes (RN, LCCRN, PCCRN, PUN).
Processing Bindings
ERCOT uses five join modes:
| Join Mode |
Semantics |
NODE_LEFT |
Left join on node-level keys |
TIME_LEFT |
Left join on time columns, broadcast to all nodes |
TIME_FULL |
Full outer join on time |
WEATHER_ZONE_FULL |
Full outer join including the weather zone dimension |
LOAD_ZONE_FULL |
Full outer join including the load zone dimension |
Classes
ERCOT
ERCOT(verbose: bool = False)
Bases: ISOBase[ERCOTRawDataType, ERCOTProcessedDataType]
Source code in src/progridpy/iso/ercot/client.py
| def __init__(self, verbose: bool = False) -> None:
super().__init__(iso=ISO.ERCOT, home_url="https://www.ercot.com/", timezone="America/Chicago")
self.verbose = verbose
self.raw_dir = Path("data/ercot/raw")
self.processed_dir = Path("data/ercot/processed")
self._ercot_api: ErcotAPI | None = None
|
Functions
clear_and_calculate_gain
clear_and_calculate_gain(trade_df: DataFrame, processed_df: DataFrame, min_offer_price: float = -500, max_bid_price: float = 2000) -> DataFrame
Clear trades and calculate financial gains based on market prices.
da_spp is the clearing price. No separate clearing column needed.
Supply offers clear when da_spp >= offer_price.
Demand bids clear when da_spp <= offer_price.
Source code in src/progridpy/iso/ercot/client.py
| def clear_and_calculate_gain(
self,
trade_df: pl.DataFrame,
processed_df: pl.DataFrame,
min_offer_price: float = -500,
max_bid_price: float = 2000,
) -> pl.DataFrame:
"""Clear trades and calculate financial gains based on market prices.
da_spp is the clearing price. No separate clearing column needed.
Supply offers clear when da_spp >= offer_price.
Demand bids clear when da_spp <= offer_price.
"""
merged = trade_df.join(processed_df, on=["interval_start_utc", "node"], how="inner", suffix="_proc")
if "interval_start_local_proc" in merged.columns:
merged = merged.drop("interval_start_local_proc")
# Default offer price when not present
if "offer_price" not in merged.columns:
merged = merged.with_columns(
pl.when(pl.col("is_supply"))
.then(pl.lit(min_offer_price))
.otherwise(pl.lit(max_bid_price))
.alias("offer_price")
)
# Determine clearing: da_spp acts as clearing price
supply_clears = pl.col("is_supply") & (pl.col("da_spp") >= pl.col("offer_price"))
demand_clears = (~pl.col("is_supply")) & (pl.col("da_spp") <= pl.col("offer_price"))
merged = merged.with_columns((supply_clears | demand_clears).alias("cleared"))
# Gain only for cleared trades
supply_gain = (pl.col("da_spp") - pl.col("rt_spp")) * pl.col("volume")
demand_gain = (pl.col("rt_spp") - pl.col("da_spp")) * pl.col("volume")
merged = merged.with_columns(
pl.when(pl.col("cleared") & pl.col("is_supply"))
.then(supply_gain)
.when(pl.col("cleared") & (~pl.col("is_supply")))
.then(demand_gain)
.otherwise(pl.lit(0.0))
.alias("gain"),
)
return merged.select(
"interval_start_utc",
"interval_start_local",
"node",
"is_supply",
"da_spp",
"rt_spp",
"offer_price",
"volume",
"cleared",
"gain",
)
|
process_trade
process_trade(trade_dir: str | Path, processed_dir: str | Path, start_date: str | datetime | None = None, end_date: str | datetime | None = None) -> DataFrame
Process trade files and calculate gains based on DA/RT spread.
Source code in src/progridpy/iso/ercot/client.py
| def process_trade(
self,
trade_dir: str | Path,
processed_dir: str | Path,
start_date: str | datetime | None = None,
end_date: str | datetime | None = None,
) -> pl.DataFrame:
"""Process trade files and calculate gains based on DA/RT spread."""
trade_path = to_path(trade_dir)
processed_pat = to_path(processed_dir)
if trade_path is None or processed_pat is None:
raise ValueError("trade_dir and processed_dir must be provided.")
if trade_path.is_file():
ensure_input_file(trade_path, "trade_dir")
else:
ensure_input_dir(trade_path, "trade_dir")
if processed_pat.is_file():
ensure_input_file(processed_pat, "processed_dir")
else:
ensure_input_dir(processed_pat, "processed_dir")
if start_date is not None:
start_date = parse_datetime(start_date, tz=self.timezone)
if end_date is not None:
end_date = parse_datetime(end_date, tz=self.timezone)
trade_files_by_date: dict[datetime, pl.DataFrame] = {}
if trade_path.is_file():
trade_files_by_date = self._parse_trade_file(trade_path, start_date, end_date)
else:
for file_path in sorted(trade_path.glob("ERCOT-*.csv")):
parts = file_path.stem.split("-")
date_str = parts[1]
file_date = parse_datetime(date_str, tz=self.timezone)
if start_date and file_date < start_date:
continue
if end_date and file_date > end_date:
continue
trade_files_by_date[file_date] = self._build_trade_df(pl.read_csv(file_path), date_str)
# Load processed nodal parquet files
processed_files_by_date: dict[datetime, pl.DataFrame] = {}
if processed_pat.is_file():
file_date = extract_date_from_hive_path(str(processed_pat))
if file_date is None:
file_date = parse_datetime(processed_pat.stem, tz=self.timezone)
else:
file_date = file_date.replace(tzinfo=self.timezone)
if (start_date is None or file_date >= start_date) and (end_date is None or file_date <= end_date):
processed_files_by_date[file_date] = pl.read_parquet(processed_pat)
else:
parquet_files = sorted(processed_pat.rglob("data.parquet"))
nodal_files = [fp for fp in parquet_files if "dataset=nodal" in str(fp)]
for file_path in nodal_files:
file_date = extract_date_from_hive_path(str(file_path))
if file_date is None:
continue
file_date = file_date.replace(tzinfo=self.timezone)
if start_date and file_date < start_date:
continue
if end_date and file_date > end_date:
continue
processed_files_by_date[file_date] = pl.read_parquet(file_path)
if not trade_files_by_date:
raise ValueError(f"No trade files found in {trade_path} for the specified date range")
if not processed_files_by_date:
raise ValueError(f"No processed files found in {processed_pat} for the specified date range")
all_results: list[pl.DataFrame] = []
for date in sorted(trade_files_by_date.keys()):
if date not in processed_files_by_date:
logger.warning("No processed data found for %s, skipping", date.strftime("%Y%m%d"))
continue
result_df = self.clear_and_calculate_gain(trade_files_by_date[date], processed_files_by_date[date])
all_results.append(result_df)
if not all_results:
raise ValueError("No matching dates found between trade and processed data")
return pl.concat(all_results)
|