Cloud Storage¶
ProgridPy uses S3Handler for all AWS S3 interactions, providing concurrent uploads and downloads with progress bars, automatic retries, and server-side encryption. The handler is designed to be used as a context manager.
S3Handler¶
Basic Usage¶
from progridpy.aws.s3 import S3Handler, S3ObjectRef
from pathlib import Path
with S3Handler() as s3:
downloaded, skipped, failed = s3.download_objects(
bucket="my-bucket",
objects=[
S3ObjectRef(key="data/file1.parquet", local_path=Path("./file1.parquet")),
S3ObjectRef(key="data/file2.parquet", local_path=Path("./file2.parquet")),
],
)
S3Handler must be used as a context manager. The __enter__ method creates a boto3.Session and S3 client; __exit__ tears them down.
Constructor Parameters¶
| Parameter | Type | Description |
|---|---|---|
config |
S3TransferConfig \| None |
Transfer configuration. Uses defaults if None. |
verbose |
bool |
Enable verbose logging for debugging. |
S3TransferConfig¶
Fine-tune transfer behavior with a frozen dataclass:
from progridpy.aws.s3 import S3TransferConfig
config = S3TransferConfig(
region="us-west-2", # AWS region
max_pool_connections=50, # HTTP connection pool size
max_concurrency=20, # Parallel transfer threads
multipart_threshold=8 * 1024 * 1024, # 8 MB -- switch to multipart above this
multipart_chunksize=8 * 1024 * 1024, # 8 MB per part
retry_attempts=3, # Automatic retries on failure
enable_encryption=True, # Server-side encryption
encryption_type="AES256", # Encryption algorithm
existence_check_threshold=256, # HEAD vs LIST threshold for skip checks
)
| Field | Default | Description |
|---|---|---|
region |
"us-west-2" |
AWS region for the S3 client |
max_pool_connections |
50 |
Maximum HTTP connections in the pool |
max_concurrency |
20 |
Maximum parallel transfer threads |
multipart_threshold |
8 MB |
File size above which multipart transfer is used |
multipart_chunksize |
8 MB |
Size of each multipart chunk |
retry_attempts |
3 |
Number of retry attempts with adaptive backoff |
enable_encryption |
True |
Enable server-side encryption on uploads |
encryption_type |
"AES256" |
Server-side encryption algorithm |
existence_check_threshold |
256 |
Below this count, use HEAD per object; above, use LIST prefix |
S3ObjectRef¶
A frozen dataclass that pairs an S3 key with a local file path:
from progridpy.aws.s3 import S3ObjectRef
from pathlib import Path
ref = S3ObjectRef(
key="iso=spp/dataset=nodal/year=2026/month=01/day=15/data.parquet",
local_path=Path("./processed/year=2026/month=01/day=15/data.parquet"),
)
Downloading Objects¶
def download_objects(
self,
bucket: str,
objects: list[S3ObjectRef],
overwrite: bool = False,
description: str | None = None,
) -> tuple[list[Path], list[Path], list[Path]]:
Returns a tuple of (downloaded, skipped, failed) path lists.
from progridpy.aws.s3 import S3Handler, S3ObjectRef
from pathlib import Path
refs = [
S3ObjectRef(key=f"data/day={d}/data.parquet", local_path=Path(f"./data/day={d}/data.parquet"))
for d in range(1, 32)
]
with S3Handler() as s3:
downloaded, skipped, failed = s3.download_objects(
bucket="progrid-datalake",
objects=refs,
overwrite=False,
description="Downloading January data",
)
print(f"Downloaded: {len(downloaded)}, Skipped: {len(skipped)}, Failed: {len(failed)}")
Skip behavior
When overwrite=False (the default), files that already exist locally are added to the skipped list without making any network requests.
Uploading Objects¶
def upload_objects(
self,
bucket: str,
objects: list[S3ObjectRef],
overwrite: bool = False,
description: str | None = None,
) -> tuple[list[str], list[str], list[Path]]:
Returns a tuple of (uploaded_keys, skipped_keys, failed_local_paths).
from progridpy.aws.s3 import S3Handler, S3ObjectRef
from pathlib import Path
refs = [
S3ObjectRef(
key="iso=miso/dataset=nodal/year=2026/month=01/day=15/data.parquet",
local_path=Path("./processed/year=2026/month=01/day=15/data.parquet"),
),
]
with S3Handler() as s3:
uploaded, skipped, failed = s3.upload_objects(
bucket="progrid-datalake",
objects=refs,
description="Uploading MISO processed data",
)
When overwrite=False, the handler checks whether each key already exists in S3 before uploading. For small batches (under existence_check_threshold), it uses individual HEAD requests; for larger batches, it uses LIST prefix queries for efficiency.
Uploads use server-side encryption by default (AES256). Disable with:
Concurrent Transfers¶
Both download_objects() and upload_objects() execute transfers concurrently using a ThreadPoolExecutor with max_concurrency workers. A rich progress bar displays real-time transfer speed, percentage, and ETA.
To increase throughput for large batch operations:
config = S3TransferConfig(
max_pool_connections=100,
max_concurrency=50,
)
with S3Handler(config=config) as s3:
s3.download_objects(bucket="my-bucket", objects=large_object_list)
Hive Path Date Extraction¶
Extract a datetime from a Hive-partitioned S3 key:
from progridpy.aws.s3 import extract_date_from_hive_path
dt = extract_date_from_hive_path("iso=spp/dataset=nodal/year=2026/month=01/day=15/data.parquet")
# datetime(2026, 1, 15, 0, 0)
dt = extract_date_from_hive_path("some/other/path.csv")
# None -- returns None when no Hive partition pattern is found
The function matches the pattern year=YYYY/month=MM/day=DD anywhere in the path string.
AWS Credential Configuration¶
S3Handler reads credentials from the standard AWS credential chain. Set credentials via environment variables:
export AWS_PROFILE=your-profile # Named profile from ~/.aws/credentials
export AWS_REGION=us-west-2 # Override the default region
export AWS_ACCESS_KEY_ID=... # Direct credential injection
export AWS_SECRET_ACCESS_KEY=...
The handler respects both AWS_PROFILE and AWS_DEFAULT_PROFILE environment variables for named profile selection.