Skip to content

FileDownloader

Multi-threaded HTTP file downloader with chunked parallel downloading, retry logic, and Rich progress bars. Used internally by ISO clients to fetch raw data files from ISO websites.

Design

The downloader splits a file into Chunk objects based on HTTP Content-Range support. Each chunk is downloaded in a separate thread. If a chunk is truncated on arrival, it is retried up to max_retries times. Files that do not support range requests fall back to a single-stream download.

Configuration

Parameter Default Description
splits 16 Number of parallel chunks
max_connections 8 Maximum concurrent threads
min_split_size 8 MB Minimum bytes per chunk
timeout 30 s Per-request timeout
max_retries 3 Retry attempts per chunk

Classes

FileDownloader

FileDownloader(splits: int = DEFAULT_SPLITS, max_connections: int = DEFAULT_MAX_CONNECTIONS, min_split_size: int = DEFAULT_MIN_SPLIT_SIZE, timeout: int = DEFAULT_TIMEOUT, max_retries: int = DEFAULT_MAX_RETRIES, retry_delay: int = DEFAULT_RETRY_DELAY, overwrite: bool = False, verbose: bool = False)

A multi-threaded file downloader with resume capability.

Args: splits: Number of chunks to split the download into for parallel downloading max_connections: Maximum number of concurrent connections min_split_size: Minimum size in bytes for each chunk (default 8MB) timeout: Request timeout in seconds max_retries: Maximum number of retry attempts for failed chunks retry_delay: Initial delay in seconds between retries (increases with each attempt) overwrite: If True, existing files will be overwritten. If False, existing files will be skipped. verbose: If True, show logging statements. If False, suppress non-error logs.

Source code in src/progridpy/utils/downloader.py
def __init__(
    self,
    splits: int = DEFAULT_SPLITS,
    max_connections: int = DEFAULT_MAX_CONNECTIONS,
    min_split_size: int = DEFAULT_MIN_SPLIT_SIZE,
    timeout: int = DEFAULT_TIMEOUT,
    max_retries: int = DEFAULT_MAX_RETRIES,
    retry_delay: int = DEFAULT_RETRY_DELAY,
    overwrite: bool = False,
    verbose: bool = False,
):
    self.splits = splits
    self.max_connections = max(1, min(64, max_connections))  # Ensure connections are within a reasonable range
    self.min_split_size = min_split_size
    self.timeout = timeout
    self.max_retries = max_retries
    self.retry_delay = retry_delay
    self.overwrite = overwrite
    self.verbose = verbose
    self.session: requests.Session | None = None
    self.progress = self._create_progress_bar()
    self.executor: ThreadPoolExecutor | None = None
    self._semaphore: threading.Semaphore | None = None
    self._progress_lock = threading.Lock()

Functions

download
download(url: str, output_file: Path | str | None = None, description: str = 'Downloading', existing_task_id: TaskID | None = None, show_progress: bool = True, skip_head: bool = False) -> Path

Download a file from a URL.

Args: url: The URL to download from. output_file: Optional path where the file should be saved. If None, the file will be saved in the current directory with a filename determined from the server response or URL. description: Description to show in the progress bar. existing_task_id: If provided, update this existing progress task instead of creating a new one. show_progress: Whether to show progress bar for this download.

Returns: Path: The path where the file was saved.

Raises: DownloadError: If the download fails after all retry attempts.

Source code in src/progridpy/utils/downloader.py
def download(
    self,
    url: str,
    output_file: Path | str | None = None,
    description: str = "Downloading",
    existing_task_id: TaskID | None = None,
    show_progress: bool = True,
    skip_head: bool = False,
) -> Path:
    """Download a file from a URL.

    Args:
        url: The URL to download from.
        output_file: Optional path where the file should be saved. If None,
                    the file will be saved in the current directory with a
                    filename determined from the server response or URL.
        description: Description to show in the progress bar.
        existing_task_id: If provided, update this existing progress task instead of creating a new one.
        show_progress: Whether to show progress bar for this download.

    Returns:
        Path: The path where the file was saved.

    Raises:
        DownloadError: If the download fails after all retry attempts.
    """
    if self.session is None:
        raise RuntimeError("Session not initialized.")
    if self.executor is None:
        raise RuntimeError("Executor not initialized.")

    if output_file is not None:
        output_file = ensure_output_file(to_path(output_file), "output_file")
    if skip_head and output_file is None:
        raise ValueError("skip_head=True requires output_file to be provided.")
    if skip_head:
        head_response = None
        total_size = 0
        accept_ranges = "none"
    else:
        # Get headers to determine filename if output_file not provided
        try:
            head_response = self.session.head(url, allow_redirects=True, timeout=self.timeout)
            head_response.raise_for_status()
        except RequestException as e:
            raise DownloadError(f"Failed to fetch metadata for {url}: {e!s}") from e

    # Determine output file if not provided
    if output_file is None:
        output_file = self._determine_output_path(url, head_response.headers)
        if self.verbose:
            self.progress.console.print(f"[blue]No output file specified. Saving to: {output_file}[/blue]")

    # Handle overwrite functionality
    if output_file.exists():
        if not self.overwrite:
            if self.verbose:
                self.progress.console.print(
                    f"[blue]File {output_file} already exists and overwrite=False. Skipping download.[/blue]"
                )
            return output_file
        if self.verbose:
            self.progress.console.print(
                f"[blue]File {output_file} already exists and overwrite=True. Removing existing file.[/blue]"
            )
        output_file.unlink()
        # Also clean up any existing chunks directory
        chunk_dir = self._get_chunk_dir(output_file)
        if chunk_dir.exists():
            self._cleanup_chunks(chunk_dir)

    task_for_this_download: TaskID | None = None
    created_task_here = False

    if not skip_head:
        total_size = int(head_response.headers.get("content-length", "0"))
        accept_ranges = head_response.headers.get("accept-ranges", "none").lower()

    if existing_task_id:
        task_for_this_download = existing_task_id
        # Caller manages this task's lifecycle (description, completion)
    elif show_progress:
        task_for_this_download = self.progress.add_task(description, total=total_size)
        created_task_here = True

    # If task_for_this_download is still None, no progress will be shown for this file.

    if skip_head:
        # No metadata — go straight to single-thread download
        return self._single_thread_download(url, output_file, 0, task_for_this_download)

    if total_size == 0:
        if self.verbose:
            self.progress.console.print(
                f"[blue]Content length for {url} is 0. Creating an empty file at {output_file}.[/blue]"
            )
        output_file.touch(exist_ok=True)
        if task_for_this_download and created_task_here:
            with self._progress_lock:
                self.progress.update(task_for_this_download, completed=0, total=0)
        return output_file

    if accept_ranges != "bytes" or self.splits <= 1:
        if accept_ranges != "bytes" and self.verbose:
            self.progress.console.print(
                f"[yellow]Server may not support range requests (Accept-Ranges: {accept_ranges}). "
                f"Using single connection for {url}[/yellow]"
            )
        return self._single_thread_download(url, output_file, total_size, task_for_this_download)

    actual_splits = self.splits
    chunk_size = total_size // actual_splits
    if (
        chunk_size < self.min_split_size and total_size > self.min_split_size
    ):  # ensure min_split_size is respected unless file is smaller
        chunk_size = self.min_split_size
        actual_splits = (total_size + chunk_size - 1) // chunk_size
    elif total_size <= self.min_split_size:  # If file is smaller than min_split_size, use 1 chunk
        actual_splits = 1
        chunk_size = total_size

    current_file_chunks: list[Chunk] = []
    start_byte = 0
    for _ in range(actual_splits):
        end_byte = min(start_byte + chunk_size - 1, total_size - 1)
        if start_byte >= total_size:
            break  # All bytes assigned
        current_file_chunks.append(Chunk(start=start_byte, end=end_byte))
        start_byte = end_byte + 1
        if end_byte == total_size - 1:
            break  # Reached end of file
    # Cover any remainder bytes not reached by integer-divided chunks
    if start_byte < total_size:
        current_file_chunks.append(Chunk(start=start_byte, end=total_size - 1))

    chunk_dir = self._get_chunk_dir(output_file)
    if chunk_dir.exists():
        for chunk_item in current_file_chunks:
            loaded_data = self._load_chunk(output_file, chunk_item.start, chunk_item.end)
            if loaded_data and len(loaded_data) == chunk_item.size():
                chunk_item.data = loaded_data
                if task_for_this_download:
                    with self._progress_lock:
                        self.progress.update(task_for_this_download, advance=chunk_item.size())
            else:
                chunk_item.reset()  # Ensure data is None if not fully loaded

    futures = {}
    for chunk_item in current_file_chunks:
        if chunk_item.data is None:  # Only download if not already loaded
            future = self.executor.submit(
                self._download_chunk,
                url,
                chunk_item,
                output_file,
                task_for_this_download,
            )
            futures[future] = chunk_item

    all_chunks_successful = True
    try:
        for future in as_completed(futures):
            if not future.result():  # _download_chunk returns bool
                all_chunks_successful = False  # A chunk failed despite retries
                # Error is raised by _download_chunk if all retries fail, so this path might be rare.
                # Consider how to handle partial success if one chunk fails after others succeed.
    except Exception as e:  # Covers exceptions from future.result() itself
        # Cancel remaining futures if one fails critically
        for f in futures:
            if not f.done():
                f.cancel()
        raise DownloadError(f"Failed during multi-part download for {url}: {e}") from e

    if not all_chunks_successful and any(
        c.data is None for c in current_file_chunks
    ):  # Or if some chunks in current_file_chunks still have chunk.data as None
        raise DownloadError(f"Multi-part download for {url} resulted in missing chunks.")

    self._combine_chunks(output_file, current_file_chunks)

    if chunk_dir.exists():
        self._cleanup_chunks(chunk_dir)

    if task_for_this_download and created_task_here:
        with self._progress_lock:
            # Ensure final progress is accurate, especially if total_size was small or chunks were adjusted
            self.progress.update(task_for_this_download, completed=total_size, total=total_size)

    return output_file
download_batch
download_batch(urls: list[str], output_files: list[Path | str] | None = None, description: str = 'Download', max_concurrent_files: int = 4, fetch_metadata: bool = True) -> tuple[list[Path], list[tuple[int, str]], tuple[int, int, int]]

Download multiple files concurrently.

Args: urls: List of URLs to download output_files: Optional list of output files. If None, files will be saved in the current directory with names determined from the server. description: Description for the batch progress bar max_concurrent_files: Maximum number of files to download concurrently fetch_metadata: If True (default), fetch file sizes upfront via HEAD requests for accurate byte-based progress tracking. If False, skip metadata fetch and use file count for progress tracking instead.

Returns: Tuple of (successful_downloads, failed_urls, counts) where: - successful_downloads is a list of Path objects for successfully downloaded files - failed_urls is a list of (index, url) tuples for failures - counts is a tuple of (downloaded_count, skipped_count, failed_count)

Source code in src/progridpy/utils/downloader.py
def download_batch(
    self,
    urls: list[str],
    output_files: list[Path | str] | None = None,
    description: str = "Download",
    max_concurrent_files: int = 4,
    fetch_metadata: bool = True,
) -> tuple[list[Path], list[tuple[int, str]], tuple[int, int, int]]:
    """Download multiple files concurrently.

    Args:
        urls: List of URLs to download
        output_files: Optional list of output files. If None, files will be saved
                     in the current directory with names determined from the server.
        description: Description for the batch progress bar
        max_concurrent_files: Maximum number of files to download concurrently
        fetch_metadata: If True (default), fetch file sizes upfront via HEAD requests
                       for accurate byte-based progress tracking. If False, skip
                       metadata fetch and use file count for progress tracking instead.

    Returns:
        Tuple of (successful_downloads, failed_urls, counts) where:
        - successful_downloads is a list of Path objects for successfully downloaded files
        - failed_urls is a list of (index, url) tuples for failures
        - counts is a tuple of (downloaded_count, skipped_count, failed_count)
    """
    if output_files is not None and len(urls) != len(output_files):
        raise ValueError("Number of URLs must match number of output files.")
    if output_files is not None:
        normalized_files: list[Path] = []
        for i, output_file in enumerate(output_files):
            path = to_path(output_file)
            if path is None:
                raise ValueError(f"output_files[{i}] must be a path, not None.")
            normalized_files.append(ensure_output_file(path, f"output_files[{i}]"))
        output_files = normalized_files

    if not urls:  # Handle empty input early
        if self.verbose:
            self.progress.console.print("[blue]No URLs provided for batch download.[/blue]")
        return [], [], (0, 0, 0)

    results: list[Path | Exception | None] = [None] * len(urls)
    num_total_files = len(urls)

    # Get file sizes upfront for accurate progress tracking (optional)
    if fetch_metadata:
        if self.verbose:
            self.progress.console.print("[blue]Getting file sizes...[/blue]")
        file_sizes, total_size = self._get_file_sizes(urls)
    else:
        # Skip metadata fetch - use file count for progress tracking
        file_sizes = [0] * len(urls)
        total_size = 0

    # Track files that already existed (only when overwrite=False)
    pre_existing_files: set[int] = set()
    if not self.overwrite and output_files is not None:
        for i, output_file_item in enumerate(output_files):
            if output_file_item.exists():
                pre_existing_files.add(i)
                results[i] = output_file_item  # Mark as successful (skipped)

    if self.executor is None:
        raise RuntimeError("Executor not initialized.")

    # Calculate initial progress for pre-existing files
    skipped_count = len(pre_existing_files)
    skipped_bytes = (
        sum(file_sizes[i] for i in pre_existing_files if i < len(file_sizes) and file_sizes[i] > 0)
        if file_sizes and pre_existing_files
        else 0
    )

    # When any size is unknown (<= 0), fall back to file count for progress tracking
    unknown_size_count = sum(1 for size in file_sizes if size <= 0)
    use_file_count_progress = total_size == 0 or unknown_size_count > 0

    # Use different columns based on whether we have size info
    if use_file_count_progress:
        # No size info - show file count progress only (no byte columns)
        batch_columns = (
            TextColumn("{task.description}", style="bold blue"),
            BarColumn(bar_width=None, complete_style="green", finished_style="bright_green"),
            TextColumn("{task.fields[files_status]}", justify="right", style="bold magenta"),
            TextColumn("•", style="dim"),
            TimeElapsedColumn(),
        )
    else:
        # Full progress with byte info
        batch_columns = (
            TextColumn("{task.description}", style="bold blue"),
            BarColumn(bar_width=None, complete_style="green", finished_style="bright_green"),
            TextColumn("{task.fields[files_status]}", justify="right", style="bold magenta"),
            TextColumn("•", style="dim"),
            DownloadColumn(),
            TextColumn("•", style="dim"),
            TransferSpeedColumn(),
            TextColumn("•", style="dim"),
            TimeRemainingColumn(),
            TextColumn("•", style="dim"),
            TimeElapsedColumn(),
        )

    self.progress.columns = batch_columns
    batch_master_task_id = self.progress.add_task(
        description=description,
        total=num_total_files if use_file_count_progress else total_size,
        completed=skipped_count if use_file_count_progress else skipped_bytes,
        files_status=f"{skipped_count}/{num_total_files} files",
    )

    # Initialize counters
    total_available = skipped_count  # For progress bar (includes pre-existing)
    total_bytes_downloaded = skipped_bytes

    with ThreadPoolExecutor(max_workers=max_concurrent_files) as batch_file_executor:
        future_to_index = {}
        if output_files is None:
            # When output_files is None, pass None for each URL
            for i, url_item in enumerate(urls):
                future = batch_file_executor.submit(
                    self.download,
                    url_item,
                    None,  # Let download() determine the output file
                    description=Path(url_item).name,
                    show_progress=False,
                    skip_head=False,
                )
                future_to_index[future] = i
        else:
            # When output_files is provided, use them
            for i, (url_item, output_file_item) in enumerate(zip(urls, output_files, strict=False)):
                # Skip submitting download task if file already exists and overwrite=False
                if i in pre_existing_files:
                    continue

                # Progress display mode and transport strategy are intentionally decoupled:
                # in file-count mode, only unknown-size files skip the per-file HEAD request.
                file_size_known = i < len(file_sizes) and file_sizes[i] > 0
                skip_head_for_file = use_file_count_progress and not file_size_known
                future = batch_file_executor.submit(
                    self.download,
                    url_item,
                    output_file_item,
                    description=Path(url_item).name,
                    show_progress=False,
                    skip_head=skip_head_for_file,
                )
                future_to_index[future] = i

        failed_urls = []
        url_to_size = dict(zip(urls, file_sizes, strict=False))
        newly_downloaded = 0  # Only files downloaded in this session

        for future in as_completed(future_to_index):
            original_index = future_to_index[future]
            file_size = 0
            try:
                result_path = future.result()
                results[original_index] = result_path
                file_size = url_to_size.get(urls[original_index], 0)
                total_bytes_downloaded += file_size
                total_available += 1
                newly_downloaded += 1
            except Exception as e:
                results[original_index] = e
                failed_urls.append((original_index, urls[original_index]))

            with self._progress_lock:
                self.progress.update(
                    batch_master_task_id,
                    advance=1 if use_file_count_progress else file_size,
                    completed=total_available if use_file_count_progress else total_bytes_downloaded,
                    files_status=f"{total_available}/{num_total_files} files",
                )
                time.sleep(0.01)

    with self._progress_lock:
        self.progress.update(
            batch_master_task_id,
            completed=total_available if use_file_count_progress else total_bytes_downloaded,
            total=num_total_files if use_file_count_progress else max(total_bytes_downloaded, 1),
            files_status=f"{total_available}/{num_total_files} files",
        )
        time.sleep(0.1)

    # Filter results to only include successful downloads (Path objects)
    successful_downloads = [result for result in results if isinstance(result, Path)]

    # Log batch summary
    failed_count = len(failed_urls)

    if self.verbose:
        if failed_urls or skipped_count > 0:
            self.progress.console.print(
                f"[yellow]Batch download completed: {newly_downloaded} downloaded, "
                f"{skipped_count} skipped, {failed_count} failed (total: {num_total_files})[/yellow]"
            )
            for idx, failed_url in failed_urls:
                self.progress.console.print(f"[yellow]  Failed URL [{idx}]: {failed_url}[/yellow]")
        self.progress.console.print(
            f"[green]Batch download complete: {newly_downloaded}/{num_total_files} files downloaded[/green]"
        )

    return successful_downloads, failed_urls, (newly_downloaded, skipped_count, failed_count)