Skip to content

AWS Module

The progridpy.aws package provides S3 transfer infrastructure for uploading and downloading both raw and processed ISO data.

Submodules

Module Contents
s3 S3Handler (concurrent transfers), S3TransferConfig, S3ObjectRef, and extract_date_from_hive_path

Usage Pattern

S3Handler is used as a context manager to manage the boto3 session lifecycle:

from progridpy.aws import S3Handler, S3TransferConfig

config = S3TransferConfig(max_concurrency=20, region="us-west-2")
with S3Handler(config=config) as s3:
    s3.upload_files(bucket, refs)
    s3.download_files(bucket, refs)

Classes

S3Handler

S3Handler(config: S3TransferConfig | None = None, verbose: bool = False)
Source code in src/progridpy/aws/s3.py
def __init__(self, config: S3TransferConfig | None = None, verbose: bool = False) -> None:
    self.config = config or S3TransferConfig()
    self.verbose = verbose
    self.session: boto3.Session | None = None
    self.s3_client = None
    self._transfer_config = None

Functions

download_objects
download_objects(bucket: str, objects: list[S3ObjectRef], overwrite: bool = False, description: str | None = None) -> tuple[list[Path], list[Path], list[Path]]

Download S3 objects to local paths.

Returns: (downloaded, skipped, failed) paths.

Source code in src/progridpy/aws/s3.py
def download_objects(
    self,
    bucket: str,
    objects: list[S3ObjectRef],
    overwrite: bool = False,
    description: str | None = None,
) -> tuple[list[Path], list[Path], list[Path]]:
    """Download S3 objects to local paths.

    Returns:
        (downloaded, skipped, failed) paths.
    """
    if not objects:
        return [], [], []

    downloaded: list[Path] = []
    skipped: list[Path] = []
    failed: list[Path] = []

    # Filter out already-existing local files when not overwriting
    to_download: list[S3ObjectRef] = []
    for obj in objects:
        if not overwrite and obj.local_path.exists():
            skipped.append(obj.local_path)
        else:
            to_download.append(obj)

    if not to_download:
        return downloaded, skipped, failed

    # Get sizes via concurrent HEAD for progress bar
    sizes = self._get_object_sizes(bucket, to_download)
    total_size = sum(sizes.get(obj.key, 0) for obj in to_download)

    desc = description or "Downloading"
    progress = self._create_progress_bar()
    progress_lock = threading.Lock()

    with progress:
        task_id = progress.add_task(desc, total=total_size)

        with ThreadPoolExecutor(max_workers=self.config.max_concurrency) as executor:
            futures = {}
            for obj in to_download:
                obj.local_path.parent.mkdir(parents=True, exist_ok=True)
                future = executor.submit(self._download_one, bucket, obj, task_id, progress, progress_lock)
                futures[future] = obj

            for future in as_completed(futures):
                obj = futures[future]
                try:
                    result = future.result()
                    if result is not None:
                        downloaded.append(result)
                    else:
                        failed.append(obj.local_path)
                except Exception:
                    failed.append(obj.local_path)

    return downloaded, skipped, failed
upload_objects
upload_objects(bucket: str, objects: list[S3ObjectRef], overwrite: bool = False, description: str | None = None) -> tuple[list[str], list[str], list[Path]]

Upload local files to S3.

Returns: (uploaded_keys, skipped_keys, failed_local_paths).

Source code in src/progridpy/aws/s3.py
def upload_objects(
    self,
    bucket: str,
    objects: list[S3ObjectRef],
    overwrite: bool = False,
    description: str | None = None,
) -> tuple[list[str], list[str], list[Path]]:
    """Upload local files to S3.

    Returns:
        (uploaded_keys, skipped_keys, failed_local_paths).
    """
    if not objects:
        return [], [], []

    uploaded: list[str] = []
    skipped: list[str] = []
    failed: list[Path] = []

    existing = set() if overwrite else self._check_existing_keys(bucket, objects)
    to_upload: list[S3ObjectRef] = []
    for obj in objects:
        if obj.key in existing:
            skipped.append(obj.key)
        else:
            to_upload.append(obj)

    if not to_upload:
        return uploaded, skipped, failed

    total_size = sum(obj.local_path.stat().st_size for obj in to_upload if obj.local_path.exists())

    desc = description or "Uploading"
    progress = self._create_progress_bar()
    progress_lock = threading.Lock()

    with progress:
        task_id = progress.add_task(desc, total=total_size)

        with ThreadPoolExecutor(max_workers=self.config.max_concurrency) as executor:
            futures = {}
            for obj in to_upload:
                if not obj.local_path.exists():
                    failed.append(obj.local_path)
                    continue
                future = executor.submit(self._upload_one, bucket, obj, task_id, progress, progress_lock)
                futures[future] = obj

            for future in as_completed(futures):
                obj = futures[future]
                try:
                    result = future.result()
                    if result is not None:
                        uploaded.append(result)
                    else:
                        failed.append(obj.local_path)
                except Exception:
                    failed.append(obj.local_path)

    return uploaded, skipped, failed