Skip to content

S3 Handler

Concurrent S3 transfer handler built on boto3. Provides thread-pool-based parallel uploads and downloads with progress bars (Rich), multipart support, and server-side encryption.

Key Components

  • S3Handler -- Context-managed class that creates a boto3 session and S3 client on entry. Supports concurrent upload/download of S3ObjectRef lists, key-existence checks with configurable batch thresholds, and verbose logging via Loguru.
  • S3TransferConfig -- Frozen dataclass controlling region, pool size, concurrency, multipart thresholds, retry attempts, and encryption settings.
  • S3ObjectRef -- Frozen dataclass pairing an S3 key with a local Path.
  • extract_date_from_hive_path -- Parses year=YYYY/month=MM/day=DD segments from a hive-partitioned S3 key and returns a datetime.

Classes

S3Handler

S3Handler(config: S3TransferConfig | None = None, verbose: bool = False)
Source code in src/progridpy/aws/s3.py
def __init__(self, config: S3TransferConfig | None = None, verbose: bool = False) -> None:
    self.config = config or S3TransferConfig()
    self.verbose = verbose
    self.session: boto3.Session | None = None
    self.s3_client = None
    self._transfer_config = None

Functions

download_objects
download_objects(bucket: str, objects: list[S3ObjectRef], overwrite: bool = False, description: str | None = None) -> tuple[list[Path], list[Path], list[Path]]

Download S3 objects to local paths.

Returns: (downloaded, skipped, failed) paths.

Source code in src/progridpy/aws/s3.py
def download_objects(
    self,
    bucket: str,
    objects: list[S3ObjectRef],
    overwrite: bool = False,
    description: str | None = None,
) -> tuple[list[Path], list[Path], list[Path]]:
    """Download S3 objects to local paths.

    Returns:
        (downloaded, skipped, failed) paths.
    """
    if not objects:
        return [], [], []

    downloaded: list[Path] = []
    skipped: list[Path] = []
    failed: list[Path] = []

    # Filter out already-existing local files when not overwriting
    to_download: list[S3ObjectRef] = []
    for obj in objects:
        if not overwrite and obj.local_path.exists():
            skipped.append(obj.local_path)
        else:
            to_download.append(obj)

    if not to_download:
        return downloaded, skipped, failed

    # Get sizes via concurrent HEAD for progress bar
    sizes = self._get_object_sizes(bucket, to_download)
    total_size = sum(sizes.get(obj.key, 0) for obj in to_download)

    desc = description or "Downloading"
    progress = self._create_progress_bar()
    progress_lock = threading.Lock()

    with progress:
        task_id = progress.add_task(desc, total=total_size)

        with ThreadPoolExecutor(max_workers=self.config.max_concurrency) as executor:
            futures = {}
            for obj in to_download:
                obj.local_path.parent.mkdir(parents=True, exist_ok=True)
                future = executor.submit(self._download_one, bucket, obj, task_id, progress, progress_lock)
                futures[future] = obj

            for future in as_completed(futures):
                obj = futures[future]
                try:
                    result = future.result()
                    if result is not None:
                        downloaded.append(result)
                    else:
                        failed.append(obj.local_path)
                except Exception:
                    failed.append(obj.local_path)

    return downloaded, skipped, failed
upload_objects
upload_objects(bucket: str, objects: list[S3ObjectRef], overwrite: bool = False, description: str | None = None) -> tuple[list[str], list[str], list[Path]]

Upload local files to S3.

Returns: (uploaded_keys, skipped_keys, failed_local_paths).

Source code in src/progridpy/aws/s3.py
def upload_objects(
    self,
    bucket: str,
    objects: list[S3ObjectRef],
    overwrite: bool = False,
    description: str | None = None,
) -> tuple[list[str], list[str], list[Path]]:
    """Upload local files to S3.

    Returns:
        (uploaded_keys, skipped_keys, failed_local_paths).
    """
    if not objects:
        return [], [], []

    uploaded: list[str] = []
    skipped: list[str] = []
    failed: list[Path] = []

    existing = set() if overwrite else self._check_existing_keys(bucket, objects)
    to_upload: list[S3ObjectRef] = []
    for obj in objects:
        if obj.key in existing:
            skipped.append(obj.key)
        else:
            to_upload.append(obj)

    if not to_upload:
        return uploaded, skipped, failed

    total_size = sum(obj.local_path.stat().st_size for obj in to_upload if obj.local_path.exists())

    desc = description or "Uploading"
    progress = self._create_progress_bar()
    progress_lock = threading.Lock()

    with progress:
        task_id = progress.add_task(desc, total=total_size)

        with ThreadPoolExecutor(max_workers=self.config.max_concurrency) as executor:
            futures = {}
            for obj in to_upload:
                if not obj.local_path.exists():
                    failed.append(obj.local_path)
                    continue
                future = executor.submit(self._upload_one, bucket, obj, task_id, progress, progress_lock)
                futures[future] = obj

            for future in as_completed(futures):
                obj = futures[future]
                try:
                    result = future.result()
                    if result is not None:
                        uploaded.append(result)
                    else:
                        failed.append(obj.local_path)
                except Exception:
                    failed.append(obj.local_path)

    return uploaded, skipped, failed