AWS Module
The progridpy.aws package provides S3 transfer infrastructure for uploading and downloading both raw and processed ISO data.
Submodules
| Module |
Contents |
s3 |
S3Handler (concurrent transfers), S3TransferConfig, S3ObjectRef, and extract_date_from_hive_path |
Usage Pattern
S3Handler is used as a context manager to manage the boto3 session lifecycle:
from progridpy.aws import S3Handler, S3TransferConfig
config = S3TransferConfig(max_concurrency=20, region="us-west-2")
with S3Handler(config=config) as s3:
s3.upload_files(bucket, refs)
s3.download_files(bucket, refs)
Classes
S3Handler
S3Handler(config: S3TransferConfig | None = None, verbose: bool = False)
Source code in src/progridpy/aws/s3.py
| def __init__(self, config: S3TransferConfig | None = None, verbose: bool = False) -> None:
self.config = config or S3TransferConfig()
self.verbose = verbose
self.session: boto3.Session | None = None
self.s3_client = None
self._transfer_config = None
|
Functions
download_objects
download_objects(bucket: str, objects: list[S3ObjectRef], overwrite: bool = False, description: str | None = None) -> tuple[list[Path], list[Path], list[Path]]
Download S3 objects to local paths.
Returns:
(downloaded, skipped, failed) paths.
Source code in src/progridpy/aws/s3.py
| def download_objects(
self,
bucket: str,
objects: list[S3ObjectRef],
overwrite: bool = False,
description: str | None = None,
) -> tuple[list[Path], list[Path], list[Path]]:
"""Download S3 objects to local paths.
Returns:
(downloaded, skipped, failed) paths.
"""
if not objects:
return [], [], []
downloaded: list[Path] = []
skipped: list[Path] = []
failed: list[Path] = []
# Filter out already-existing local files when not overwriting
to_download: list[S3ObjectRef] = []
for obj in objects:
if not overwrite and obj.local_path.exists():
skipped.append(obj.local_path)
else:
to_download.append(obj)
if not to_download:
return downloaded, skipped, failed
# Get sizes via concurrent HEAD for progress bar
sizes = self._get_object_sizes(bucket, to_download)
total_size = sum(sizes.get(obj.key, 0) for obj in to_download)
desc = description or "Downloading"
progress = self._create_progress_bar()
progress_lock = threading.Lock()
with progress:
task_id = progress.add_task(desc, total=total_size)
with ThreadPoolExecutor(max_workers=self.config.max_concurrency) as executor:
futures = {}
for obj in to_download:
obj.local_path.parent.mkdir(parents=True, exist_ok=True)
future = executor.submit(self._download_one, bucket, obj, task_id, progress, progress_lock)
futures[future] = obj
for future in as_completed(futures):
obj = futures[future]
try:
result = future.result()
if result is not None:
downloaded.append(result)
else:
failed.append(obj.local_path)
except Exception:
failed.append(obj.local_path)
return downloaded, skipped, failed
|
upload_objects
upload_objects(bucket: str, objects: list[S3ObjectRef], overwrite: bool = False, description: str | None = None) -> tuple[list[str], list[str], list[Path]]
Upload local files to S3.
Returns:
(uploaded_keys, skipped_keys, failed_local_paths).
Source code in src/progridpy/aws/s3.py
| def upload_objects(
self,
bucket: str,
objects: list[S3ObjectRef],
overwrite: bool = False,
description: str | None = None,
) -> tuple[list[str], list[str], list[Path]]:
"""Upload local files to S3.
Returns:
(uploaded_keys, skipped_keys, failed_local_paths).
"""
if not objects:
return [], [], []
uploaded: list[str] = []
skipped: list[str] = []
failed: list[Path] = []
existing = set() if overwrite else self._check_existing_keys(bucket, objects)
to_upload: list[S3ObjectRef] = []
for obj in objects:
if obj.key in existing:
skipped.append(obj.key)
else:
to_upload.append(obj)
if not to_upload:
return uploaded, skipped, failed
total_size = sum(obj.local_path.stat().st_size for obj in to_upload if obj.local_path.exists())
desc = description or "Uploading"
progress = self._create_progress_bar()
progress_lock = threading.Lock()
with progress:
task_id = progress.add_task(desc, total=total_size)
with ThreadPoolExecutor(max_workers=self.config.max_concurrency) as executor:
futures = {}
for obj in to_upload:
if not obj.local_path.exists():
failed.append(obj.local_path)
continue
future = executor.submit(self._upload_one, bucket, obj, task_id, progress, progress_lock)
futures[future] = obj
for future in as_completed(futures):
obj = futures[future]
try:
result = future.result()
if result is not None:
uploaded.append(result)
else:
failed.append(obj.local_path)
except Exception:
failed.append(obj.local_path)
return uploaded, skipped, failed
|