Skip to content

Storage Package API Reference

BatchResult dataclass

BatchResult(results: List[FileResult])

Results of a batch operation with per-file status.

Attributes:

Name Type Description
results List[FileResult]

List of FileResult for each file.

ok_results property
ok_results: List[FileResult]

Get all successful operations (OK, OVERWRITTEN, or SKIPPED).

skipped_results property
skipped_results: List[FileResult]

Get operations where no action was taken.

Includes: - SKIPPED: Intentionally skipped (e.g., file already exists locally) - ALREADY_EXISTS: Conflict (tried to create but already exists)

Note: SKIPPED is considered success (.ok=True), ALREADY_EXISTS is conflict (.ok=False).

conflict_results property
conflict_results: List[FileResult]

Get operations that conflicted (tried to create but already exists).

failed_results property
failed_results: List[FileResult]

Get all failed operations (NOT_FOUND or ERROR).

all_ok property
all_ok: bool

Check if all operations succeeded.

FileResult dataclass

FileResult(
    local_path: str,
    remote_path: str,
    status: Status,
    error_type: str | None = None,
    error_message: str | None = None,
)

Result of a single file operation with detailed status.

Attributes:

Name Type Description
local_path str

Local file path (source for uploads, destination for downloads).

remote_path str

Remote storage path.

status Status

Operation status.

error_type str | None

Type of error if status is ERROR (e.g., "PermissionDenied").

error_message str | None

Detailed error message if status is not OK.

ok property
ok: bool

Check if operation succeeded (OK, OVERWRITTEN, or SKIPPED).

Status

Bases: str, Enum

Status values for storage and registry operations.

Inherits from str to allow direct string comparison and serialization.

StorageHandler

StorageHandler(*args, **kwargs)

Bases: MindtraceABC, ABC

Abstract interface all storage providers must implement.

upload abstractmethod
upload(
    local_path: str,
    remote_path: str,
    metadata: Optional[Dict[str, str]] = None,
    fail_if_exists: bool = False,
) -> FileResult

Upload a file from local_path to remote_path in storage. Args: local_path: Path to the local file to upload. remote_path: Path in the storage backend to upload to. metadata: Optional metadata to associate with the file. fail_if_exists: If True, return "already_exists" status if file exists. Returns: FileResult with status: - "ok": Upload succeeded - "already_exists": File existed and fail_if_exists=True - "error": Other error occurred

download abstractmethod
download(
    remote_path: str, local_path: str, skip_if_exists: bool = False
) -> FileResult

Download a file from remote_path in storage to local_path. Args: remote_path: Path in the storage backend to download from. local_path: Local path to save the downloaded file. skip_if_exists: If True, skip download if local_path exists. Returns: FileResult with status: - "ok": Download succeeded - "skipped": Local file existed and skip_if_exists=True - "not_found": Remote file doesn't exist - "error": Other error occurred

delete abstractmethod
delete(remote_path: str) -> FileResult

Delete a file at remote_path in storage.

Parameters:

Name Type Description Default
remote_path str

Path in the storage backend to delete.

required

Returns:

Type Description
FileResult

FileResult with status:

FileResult
  • "ok": Delete succeeded
FileResult
  • "not_found": Remote file didn't exist
FileResult
  • "error": Other error occurred
upload_string abstractmethod
upload_string(
    content: str | bytes,
    remote_path: str,
    content_type: str = "application/json",
    fail_if_exists: bool = False,
    if_generation_match: int | None = None,
) -> StringResult

Upload string/bytes content directly to storage without temp files.

Parameters:

Name Type Description Default
content str | bytes

String or bytes content to upload.

required
remote_path str

Path in the storage backend to upload to.

required
content_type str

MIME type of the content.

'application/json'
fail_if_exists bool

If True, fail if the object already exists.

False
if_generation_match int | None

If set, only upload if the object's generation matches this value. Use 0 to only create new objects. Takes precedence over fail_if_exists.

None

Returns:

Type Description
StringResult

StringResult with status:

StringResult
  • "ok": Upload succeeded
StringResult
  • "already_exists": Object existed and fail_if_exists=True or generation mismatch
StringResult
  • "error": Other error occurred
download_string abstractmethod
download_string(remote_path: str) -> StringResult

Download object content as bytes without temp files.

Parameters:

Name Type Description Default
remote_path str

Path in the storage backend to download from.

required

Returns:

Type Description
StringResult

StringResult with:

StringResult
  • status: "ok", "not_found", or "error"
StringResult
  • content: Downloaded bytes if status is "ok"
upload_batch
upload_batch(
    files: List[Tuple[str, str]],
    metadata: Optional[Dict[str, str]] = None,
    max_workers: int = 4,
    fail_if_exists: bool = False,
) -> BatchResult

Upload multiple files concurrently.

Parameters:

Name Type Description Default
files List[Tuple[str, str]]

List of (local_path, remote_path) tuples to upload.

required
metadata Optional[Dict[str, str]]

Optional metadata to associate with each file.

None
max_workers int

Number of parallel upload workers.

4
fail_if_exists bool

If True, report ALREADY_EXISTS status if file exists.

False

Returns:

Type Description
BatchResult

BatchResult with per-file results. Use batch_result.all_ok to check success,

BatchResult

batch_result.failed_results to inspect failures.

download_batch
download_batch(
    files: List[Tuple[str, str]],
    max_workers: int = 4,
    skip_if_exists: bool = False,
) -> BatchResult

Download multiple files concurrently.

Parameters:

Name Type Description Default
files List[Tuple[str, str]]

List of (remote_path, local_path) tuples to download.

required
max_workers int

Number of parallel download workers.

4
skip_if_exists bool

If True, skip files that already exist locally.

False

Returns:

Type Description
BatchResult

BatchResult with per-file results. Use batch_result.all_ok to check success,

BatchResult

batch_result.failed_results to inspect failures.

download_string_batch
download_string_batch(
    remote_paths: List[str], max_workers: int = 4
) -> List[StringResult]

Download multiple objects as in-memory bytes concurrently.

Parameters:

Name Type Description Default
remote_paths List[str]

List of remote paths to download.

required
max_workers int

Number of parallel download workers.

4

Returns:

Type Description
List[StringResult]

List of StringResult in the same order as remote_paths.

delete_batch
delete_batch(paths: List[str], max_workers: int = 4) -> BatchResult

Delete multiple files concurrently.

Parameters:

Name Type Description Default
paths List[str]

List of remote paths to delete.

required
max_workers int

Number of parallel delete workers.

4

Returns:

Type Description
BatchResult

BatchResult with per-file status:

BatchResult
  • "ok": Delete succeeded
BatchResult
  • "not_found": Remote file didn't exist
BatchResult
  • "error": Other error occurred
upload_folder
upload_folder(
    local_folder: str,
    remote_prefix: str = "",
    include_patterns: Optional[List[str]] = None,
    exclude_patterns: Optional[List[str]] = None,
    metadata: Optional[Dict[str, str]] = None,
    max_workers: int = 4,
    fail_if_exists: bool = False,
) -> BatchResult

Upload all files in a local folder recursively.

Parameters:

Name Type Description Default
local_folder str

Path to the local folder to upload.

required
remote_prefix str

Prefix to prepend to all remote paths.

''
include_patterns Optional[List[str]]

List of glob patterns to include.

None
exclude_patterns Optional[List[str]]

List of glob patterns to exclude.

None
metadata Optional[Dict[str, str]]

Optional metadata to associate with each file.

None
max_workers int

Number of parallel upload workers.

4
fail_if_exists bool

If True, report ALREADY_EXISTS status if file exists.

False

Returns:

Type Description
BatchResult

BatchResult with per-file results.

download_folder
download_folder(
    remote_prefix: str,
    local_folder: str,
    max_workers: int = 4,
    skip_if_exists: bool = False,
) -> BatchResult

Download all objects with a given prefix to a local folder.

Parameters:

Name Type Description Default
remote_prefix str

Prefix of remote objects to download.

required
local_folder str

Local folder to download files into.

required
max_workers int

Number of parallel download workers.

4
skip_if_exists bool

If True, skip files that already exist locally.

False

Returns:

Type Description
BatchResult

BatchResult with per-file results.

list_objects abstractmethod
list_objects(
    *, prefix: str = "", max_results: Optional[int] = None
) -> List[str]

List objects in storage with an optional prefix and limit. Args: prefix: Only list objects with this prefix. max_results: Maximum number of results to return. Returns: List of object paths.

exists abstractmethod
exists(remote_path: str) -> bool

Check if a remote object exists in storage. Args: remote_path: Path in the storage backend to check. Returns: True if the object exists, False otherwise.

get_presigned_url abstractmethod
get_presigned_url(
    remote_path: str, *, expiration_minutes: int = 60, method: str = "GET"
) -> str

Get a presigned URL for a remote object. Args: remote_path: Path in the storage backend. expiration_minutes: Minutes until the URL expires. method: HTTP method for the URL (e.g., 'GET', 'PUT'). Returns: A presigned URL string.

get_object_metadata abstractmethod
get_object_metadata(remote_path: str) -> Dict[str, Any]

Get metadata for a remote object. Args: remote_path: Path in the storage backend. Returns: Dictionary of metadata for the object.

StringResult dataclass

StringResult(
    remote_path: str,
    status: Status,
    content: bytes | None = None,
    error_type: str | None = None,
    error_message: str | None = None,
)

Result of a string upload/download operation.

Attributes:

Name Type Description
remote_path str

Remote storage path.

status Status

Operation status.

content bytes | None

Downloaded content (for download operations).

error_type str | None

Type of error if status is ERROR.

error_message str | None

Detailed error message if status is not OK.

ok property
ok: bool

Check if operation succeeded (OK, OVERWRITTEN, or SKIPPED).

GCSStorageHandler

GCSStorageHandler(
    bucket_name: str,
    *,
    project_id: Optional[str] = None,
    credentials_path: Optional[str] = None,
    ensure_bucket: bool = True,
    create_if_missing: bool = False,
    location: str = "US",
    storage_class: str = "STANDARD"
)

Bases: StorageHandler

A thin wrapper around google-cloud-storage APIs.

Initialize a GCSStorageHandler. Args: bucket_name: Name of the GCS bucket. project_id: Optional GCP project ID. credentials_path: Optional path to a service account JSON file. ensure_bucket: If True, raise NotFound if bucket does not exist and create_if_missing is False. create_if_missing: If True, create the bucket if it does not exist. location: Location for bucket creation (if needed). storage_class: Storage class for bucket creation (if needed). Raises: google.api_core.exceptions.NotFound: If ensure_bucket is True and the bucket does not exist and create_if_missing is False.

upload
upload(
    local_path: str,
    remote_path: str,
    metadata: Optional[Dict[str, str]] = None,
    fail_if_exists: bool = False,
) -> FileResult

Upload a file to GCS. Args: local_path: Path to the local file to upload. remote_path: Path in the bucket to upload to. metadata: Optional metadata to associate with the blob. fail_if_exists: If True, return "already_exists" status if blob exists. Returns: FileResult with status "ok", "already_exists", or "error". Note: remote_path in result is the blob name (not full gs:// URI) for use with delete().

download
download(
    remote_path: str, local_path: str, skip_if_exists: bool = False
) -> FileResult

Download a file from GCS to a local path. Args: remote_path: Path in the bucket to download from. local_path: Local path to save the file. skip_if_exists: If True, skip download if local_path exists. Returns: FileResult with status "ok", "skipped", "not_found", or "error".

delete
delete(remote_path: str) -> FileResult

Delete a file from GCS.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to delete.

required

Returns:

Type Description
FileResult

FileResult with status "ok", "not_found", or "error".

upload_string
upload_string(
    content: str | bytes,
    remote_path: str,
    content_type: str = "application/json",
    fail_if_exists: bool = False,
    if_generation_match: int | None = None,
) -> StringResult

Upload string/bytes content directly to GCS without temp files.

Parameters:

Name Type Description Default
content str | bytes

String or bytes content to upload.

required
remote_path str

Path in the bucket to upload to.

required
content_type str

MIME type of the content.

'application/json'
fail_if_exists bool

If True, fail if the blob already exists.

False
if_generation_match int | None

If set, only upload if the blob's generation matches this value. Use 0 to only create new blobs. Takes precedence over fail_if_exists.

None

Returns:

Name Type Description
StringResult

StringResult with status "ok", "already_exists", or "error".

Note StringResult

remote_path in result is the blob name (not full gs:// URI) for use with delete().

download_string
download_string(remote_path: str) -> StringResult

Download blob content as bytes without temp files.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to download from.

required

Returns:

Type Description
StringResult

StringResult with:

StringResult
  • status: "ok", "not_found", or "error"
StringResult
  • content: Downloaded bytes if status is "ok"
list_objects
list_objects(
    *, prefix: str = "", max_results: Optional[int] = None
) -> List[str]

List objects in the bucket with an optional prefix and limit. Args: prefix: Only list objects with this prefix. max_results: Maximum number of results to return. Returns: List of blob names (paths) in the bucket.

exists
exists(remote_path: str) -> bool

Check if a blob exists in the bucket. Args: remote_path: Path in the bucket to check. Returns: True if the blob exists, False otherwise.

get_presigned_url
get_presigned_url(
    remote_path: str, *, expiration_minutes: int = 60, method: str = "GET"
) -> str

Get a presigned URL for a blob in the bucket. Args: remote_path: Path in the bucket. expiration_minutes: Minutes until the URL expires. method: HTTP method for the URL (e.g., 'GET', 'PUT'). Returns: A presigned URL string.

get_object_metadata
get_object_metadata(remote_path: str) -> Dict[str, Any]

Get metadata for a blob in the bucket. Args: remote_path: Path in the bucket. Returns: Dictionary of metadata for the blob, including name, size, content_type, timestamps, and custom metadata.

S3StorageHandler

S3StorageHandler(
    bucket_name: str,
    *,
    endpoint: str,
    access_key: str,
    secret_key: str,
    secure: bool = True,
    ensure_bucket: bool = True,
    create_if_missing: bool = True,
    region: Optional[str] = None
)

Bases: StorageHandler

A thin wrapper around boto3 S3 APIs for S3-compatible storage.

Works with AWS S3, Minio, DigitalOcean Spaces, and other S3-compatible services. Uses boto3 with IfNoneMatch='*' for atomic conditional writes.

Initialize an S3StorageHandler.

Parameters:

Name Type Description Default
bucket_name str

Name of the S3 bucket.

required
endpoint str

S3-compatible server endpoint (e.g., "localhost:9000", "s3.amazonaws.com").

required
access_key str

Access key for authentication.

required
secret_key str

Secret key for authentication.

required
secure bool

Whether to use HTTPS (default True).

True
ensure_bucket bool

If True, check bucket exists on init.

True
create_if_missing bool

If True, create the bucket if it does not exist.

True
region Optional[str]

Optional region for bucket creation.

None
upload
upload(
    local_path: str,
    remote_path: str,
    metadata: Optional[Dict[str, str]] = None,
    fail_if_exists: bool = False,
) -> FileResult

Upload a file to S3.

Parameters:

Name Type Description Default
local_path str

Path to the local file to upload.

required
remote_path str

Path in the bucket to upload to (key only, no s3:// prefix).

required
metadata Optional[Dict[str, str]]

Optional metadata to associate with the object.

None
fail_if_exists bool

If True, return ALREADY_EXISTS status if object exists. Uses S3 IfNoneMatch='*' for atomic create-only semantics.

False

Returns:

Name Type Description
FileResult

FileResult with status OK, ALREADY_EXISTS, or ERROR.

Note FileResult

remote_path in result is the key (not full s3:// URI) for use with delete().

download
download(
    remote_path: str, local_path: str, skip_if_exists: bool = False
) -> FileResult

Download a file from S3 to a local path.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to download from.

required
local_path str

Local path to save the file.

required
skip_if_exists bool

If True, skip download if local_path exists.

False

Returns:

Type Description
FileResult

FileResult with status OK, SKIPPED, NOT_FOUND, or ERROR.

delete
delete(remote_path: str) -> FileResult

Delete a file from S3.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to delete.

required

Returns:

Type Description
FileResult

FileResult with status OK, NOT_FOUND, or ERROR.

upload_string
upload_string(
    content: str | bytes,
    remote_path: str,
    content_type: str = "application/json",
    fail_if_exists: bool = False,
    if_generation_match: int | None = None,
) -> StringResult

Upload string/bytes content directly to S3 without temp files.

Parameters:

Name Type Description Default
content str | bytes

String or bytes content to upload.

required
remote_path str

Path in the bucket to upload to (key only, no s3:// prefix).

required
content_type str

MIME type of the content.

'application/json'
fail_if_exists bool

If True, fail if the object already exists.

False
if_generation_match int | None

If 0, uses IfNoneMatch='*' for atomic create-only. This matches GCS semantics where generation=0 means "only if not exists".

None

Returns:

Name Type Description
StringResult

StringResult with status OK, ALREADY_EXISTS, or ERROR.

Note StringResult

remote_path in result is the key (not full s3:// URI) for use with delete().

download_string
download_string(remote_path: str) -> StringResult

Download object content as bytes without temp files.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to download from.

required

Returns:

Type Description
StringResult

StringResult with status OK, NOT_FOUND, or ERROR, and content if OK.

list_objects
list_objects(
    *, prefix: str = "", max_results: Optional[int] = None
) -> List[str]

List objects in the bucket with an optional prefix and limit.

Parameters:

Name Type Description Default
prefix str

Only list objects with this prefix.

''
max_results Optional[int]

Maximum number of results to return.

None

Returns:

Type Description
List[str]

List of object names (paths) in the bucket.

exists
exists(remote_path: str) -> bool

Check if an object exists in the bucket.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to check.

required

Returns:

Type Description
bool

True if the object exists, False otherwise.

get_presigned_url
get_presigned_url(
    remote_path: str, *, expiration_minutes: int = 60, method: str = "GET"
) -> str

Get a presigned URL for an object in the bucket.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket.

required
expiration_minutes int

Minutes until the URL expires.

60
method str

HTTP method for the URL (e.g., 'GET', 'PUT').

'GET'

Returns:

Type Description
str

A presigned URL string.

get_object_metadata
get_object_metadata(remote_path: str) -> Dict[str, Any]

Get metadata for an object in the bucket.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket.

required

Returns:

Type Description
Dict[str, Any]

Dictionary of metadata for the object.

base

Status

Bases: str, Enum

Status values for storage and registry operations.

Inherits from str to allow direct string comparison and serialization.

FileResult dataclass
FileResult(
    local_path: str,
    remote_path: str,
    status: Status,
    error_type: str | None = None,
    error_message: str | None = None,
)

Result of a single file operation with detailed status.

Attributes:

Name Type Description
local_path str

Local file path (source for uploads, destination for downloads).

remote_path str

Remote storage path.

status Status

Operation status.

error_type str | None

Type of error if status is ERROR (e.g., "PermissionDenied").

error_message str | None

Detailed error message if status is not OK.

ok property
ok: bool

Check if operation succeeded (OK, OVERWRITTEN, or SKIPPED).

StringResult dataclass
StringResult(
    remote_path: str,
    status: Status,
    content: bytes | None = None,
    error_type: str | None = None,
    error_message: str | None = None,
)

Result of a string upload/download operation.

Attributes:

Name Type Description
remote_path str

Remote storage path.

status Status

Operation status.

content bytes | None

Downloaded content (for download operations).

error_type str | None

Type of error if status is ERROR.

error_message str | None

Detailed error message if status is not OK.

ok property
ok: bool

Check if operation succeeded (OK, OVERWRITTEN, or SKIPPED).

BatchResult dataclass
BatchResult(results: List[FileResult])

Results of a batch operation with per-file status.

Attributes:

Name Type Description
results List[FileResult]

List of FileResult for each file.

ok_results property
ok_results: List[FileResult]

Get all successful operations (OK, OVERWRITTEN, or SKIPPED).

skipped_results property
skipped_results: List[FileResult]

Get operations where no action was taken.

Includes: - SKIPPED: Intentionally skipped (e.g., file already exists locally) - ALREADY_EXISTS: Conflict (tried to create but already exists)

Note: SKIPPED is considered success (.ok=True), ALREADY_EXISTS is conflict (.ok=False).

conflict_results property
conflict_results: List[FileResult]

Get operations that conflicted (tried to create but already exists).

failed_results property
failed_results: List[FileResult]

Get all failed operations (NOT_FOUND or ERROR).

all_ok property
all_ok: bool

Check if all operations succeeded.

StorageHandler
StorageHandler(*args, **kwargs)

Bases: MindtraceABC, ABC

Abstract interface all storage providers must implement.

upload abstractmethod
upload(
    local_path: str,
    remote_path: str,
    metadata: Optional[Dict[str, str]] = None,
    fail_if_exists: bool = False,
) -> FileResult

Upload a file from local_path to remote_path in storage. Args: local_path: Path to the local file to upload. remote_path: Path in the storage backend to upload to. metadata: Optional metadata to associate with the file. fail_if_exists: If True, return "already_exists" status if file exists. Returns: FileResult with status: - "ok": Upload succeeded - "already_exists": File existed and fail_if_exists=True - "error": Other error occurred

download abstractmethod
download(
    remote_path: str, local_path: str, skip_if_exists: bool = False
) -> FileResult

Download a file from remote_path in storage to local_path. Args: remote_path: Path in the storage backend to download from. local_path: Local path to save the downloaded file. skip_if_exists: If True, skip download if local_path exists. Returns: FileResult with status: - "ok": Download succeeded - "skipped": Local file existed and skip_if_exists=True - "not_found": Remote file doesn't exist - "error": Other error occurred

delete abstractmethod
delete(remote_path: str) -> FileResult

Delete a file at remote_path in storage.

Parameters:

Name Type Description Default
remote_path str

Path in the storage backend to delete.

required

Returns:

Type Description
FileResult

FileResult with status:

FileResult
  • "ok": Delete succeeded
FileResult
  • "not_found": Remote file didn't exist
FileResult
  • "error": Other error occurred
upload_string abstractmethod
upload_string(
    content: str | bytes,
    remote_path: str,
    content_type: str = "application/json",
    fail_if_exists: bool = False,
    if_generation_match: int | None = None,
) -> StringResult

Upload string/bytes content directly to storage without temp files.

Parameters:

Name Type Description Default
content str | bytes

String or bytes content to upload.

required
remote_path str

Path in the storage backend to upload to.

required
content_type str

MIME type of the content.

'application/json'
fail_if_exists bool

If True, fail if the object already exists.

False
if_generation_match int | None

If set, only upload if the object's generation matches this value. Use 0 to only create new objects. Takes precedence over fail_if_exists.

None

Returns:

Type Description
StringResult

StringResult with status:

StringResult
  • "ok": Upload succeeded
StringResult
  • "already_exists": Object existed and fail_if_exists=True or generation mismatch
StringResult
  • "error": Other error occurred
download_string abstractmethod
download_string(remote_path: str) -> StringResult

Download object content as bytes without temp files.

Parameters:

Name Type Description Default
remote_path str

Path in the storage backend to download from.

required

Returns:

Type Description
StringResult

StringResult with:

StringResult
  • status: "ok", "not_found", or "error"
StringResult
  • content: Downloaded bytes if status is "ok"
upload_batch
upload_batch(
    files: List[Tuple[str, str]],
    metadata: Optional[Dict[str, str]] = None,
    max_workers: int = 4,
    fail_if_exists: bool = False,
) -> BatchResult

Upload multiple files concurrently.

Parameters:

Name Type Description Default
files List[Tuple[str, str]]

List of (local_path, remote_path) tuples to upload.

required
metadata Optional[Dict[str, str]]

Optional metadata to associate with each file.

None
max_workers int

Number of parallel upload workers.

4
fail_if_exists bool

If True, report ALREADY_EXISTS status if file exists.

False

Returns:

Type Description
BatchResult

BatchResult with per-file results. Use batch_result.all_ok to check success,

BatchResult

batch_result.failed_results to inspect failures.

download_batch
download_batch(
    files: List[Tuple[str, str]],
    max_workers: int = 4,
    skip_if_exists: bool = False,
) -> BatchResult

Download multiple files concurrently.

Parameters:

Name Type Description Default
files List[Tuple[str, str]]

List of (remote_path, local_path) tuples to download.

required
max_workers int

Number of parallel download workers.

4
skip_if_exists bool

If True, skip files that already exist locally.

False

Returns:

Type Description
BatchResult

BatchResult with per-file results. Use batch_result.all_ok to check success,

BatchResult

batch_result.failed_results to inspect failures.

download_string_batch
download_string_batch(
    remote_paths: List[str], max_workers: int = 4
) -> List[StringResult]

Download multiple objects as in-memory bytes concurrently.

Parameters:

Name Type Description Default
remote_paths List[str]

List of remote paths to download.

required
max_workers int

Number of parallel download workers.

4

Returns:

Type Description
List[StringResult]

List of StringResult in the same order as remote_paths.

delete_batch
delete_batch(paths: List[str], max_workers: int = 4) -> BatchResult

Delete multiple files concurrently.

Parameters:

Name Type Description Default
paths List[str]

List of remote paths to delete.

required
max_workers int

Number of parallel delete workers.

4

Returns:

Type Description
BatchResult

BatchResult with per-file status:

BatchResult
  • "ok": Delete succeeded
BatchResult
  • "not_found": Remote file didn't exist
BatchResult
  • "error": Other error occurred
upload_folder
upload_folder(
    local_folder: str,
    remote_prefix: str = "",
    include_patterns: Optional[List[str]] = None,
    exclude_patterns: Optional[List[str]] = None,
    metadata: Optional[Dict[str, str]] = None,
    max_workers: int = 4,
    fail_if_exists: bool = False,
) -> BatchResult

Upload all files in a local folder recursively.

Parameters:

Name Type Description Default
local_folder str

Path to the local folder to upload.

required
remote_prefix str

Prefix to prepend to all remote paths.

''
include_patterns Optional[List[str]]

List of glob patterns to include.

None
exclude_patterns Optional[List[str]]

List of glob patterns to exclude.

None
metadata Optional[Dict[str, str]]

Optional metadata to associate with each file.

None
max_workers int

Number of parallel upload workers.

4
fail_if_exists bool

If True, report ALREADY_EXISTS status if file exists.

False

Returns:

Type Description
BatchResult

BatchResult with per-file results.

download_folder
download_folder(
    remote_prefix: str,
    local_folder: str,
    max_workers: int = 4,
    skip_if_exists: bool = False,
) -> BatchResult

Download all objects with a given prefix to a local folder.

Parameters:

Name Type Description Default
remote_prefix str

Prefix of remote objects to download.

required
local_folder str

Local folder to download files into.

required
max_workers int

Number of parallel download workers.

4
skip_if_exists bool

If True, skip files that already exist locally.

False

Returns:

Type Description
BatchResult

BatchResult with per-file results.

list_objects abstractmethod
list_objects(
    *, prefix: str = "", max_results: Optional[int] = None
) -> List[str]

List objects in storage with an optional prefix and limit. Args: prefix: Only list objects with this prefix. max_results: Maximum number of results to return. Returns: List of object paths.

exists abstractmethod
exists(remote_path: str) -> bool

Check if a remote object exists in storage. Args: remote_path: Path in the storage backend to check. Returns: True if the object exists, False otherwise.

get_presigned_url abstractmethod
get_presigned_url(
    remote_path: str, *, expiration_minutes: int = 60, method: str = "GET"
) -> str

Get a presigned URL for a remote object. Args: remote_path: Path in the storage backend. expiration_minutes: Minutes until the URL expires. method: HTTP method for the URL (e.g., 'GET', 'PUT'). Returns: A presigned URL string.

get_object_metadata abstractmethod
get_object_metadata(remote_path: str) -> Dict[str, Any]

Get metadata for a remote object. Args: remote_path: Path in the storage backend. Returns: Dictionary of metadata for the object.

gcs

GCSStorageHandler
GCSStorageHandler(
    bucket_name: str,
    *,
    project_id: Optional[str] = None,
    credentials_path: Optional[str] = None,
    ensure_bucket: bool = True,
    create_if_missing: bool = False,
    location: str = "US",
    storage_class: str = "STANDARD"
)

Bases: StorageHandler

A thin wrapper around google-cloud-storage APIs.

Initialize a GCSStorageHandler. Args: bucket_name: Name of the GCS bucket. project_id: Optional GCP project ID. credentials_path: Optional path to a service account JSON file. ensure_bucket: If True, raise NotFound if bucket does not exist and create_if_missing is False. create_if_missing: If True, create the bucket if it does not exist. location: Location for bucket creation (if needed). storage_class: Storage class for bucket creation (if needed). Raises: google.api_core.exceptions.NotFound: If ensure_bucket is True and the bucket does not exist and create_if_missing is False.

upload
upload(
    local_path: str,
    remote_path: str,
    metadata: Optional[Dict[str, str]] = None,
    fail_if_exists: bool = False,
) -> FileResult

Upload a file to GCS. Args: local_path: Path to the local file to upload. remote_path: Path in the bucket to upload to. metadata: Optional metadata to associate with the blob. fail_if_exists: If True, return "already_exists" status if blob exists. Returns: FileResult with status "ok", "already_exists", or "error". Note: remote_path in result is the blob name (not full gs:// URI) for use with delete().

download
download(
    remote_path: str, local_path: str, skip_if_exists: bool = False
) -> FileResult

Download a file from GCS to a local path. Args: remote_path: Path in the bucket to download from. local_path: Local path to save the file. skip_if_exists: If True, skip download if local_path exists. Returns: FileResult with status "ok", "skipped", "not_found", or "error".

delete
delete(remote_path: str) -> FileResult

Delete a file from GCS.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to delete.

required

Returns:

Type Description
FileResult

FileResult with status "ok", "not_found", or "error".

upload_string
upload_string(
    content: str | bytes,
    remote_path: str,
    content_type: str = "application/json",
    fail_if_exists: bool = False,
    if_generation_match: int | None = None,
) -> StringResult

Upload string/bytes content directly to GCS without temp files.

Parameters:

Name Type Description Default
content str | bytes

String or bytes content to upload.

required
remote_path str

Path in the bucket to upload to.

required
content_type str

MIME type of the content.

'application/json'
fail_if_exists bool

If True, fail if the blob already exists.

False
if_generation_match int | None

If set, only upload if the blob's generation matches this value. Use 0 to only create new blobs. Takes precedence over fail_if_exists.

None

Returns:

Name Type Description
StringResult

StringResult with status "ok", "already_exists", or "error".

Note StringResult

remote_path in result is the blob name (not full gs:// URI) for use with delete().

download_string
download_string(remote_path: str) -> StringResult

Download blob content as bytes without temp files.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to download from.

required

Returns:

Type Description
StringResult

StringResult with:

StringResult
  • status: "ok", "not_found", or "error"
StringResult
  • content: Downloaded bytes if status is "ok"
list_objects
list_objects(
    *, prefix: str = "", max_results: Optional[int] = None
) -> List[str]

List objects in the bucket with an optional prefix and limit. Args: prefix: Only list objects with this prefix. max_results: Maximum number of results to return. Returns: List of blob names (paths) in the bucket.

exists
exists(remote_path: str) -> bool

Check if a blob exists in the bucket. Args: remote_path: Path in the bucket to check. Returns: True if the blob exists, False otherwise.

get_presigned_url
get_presigned_url(
    remote_path: str, *, expiration_minutes: int = 60, method: str = "GET"
) -> str

Get a presigned URL for a blob in the bucket. Args: remote_path: Path in the bucket. expiration_minutes: Minutes until the URL expires. method: HTTP method for the URL (e.g., 'GET', 'PUT'). Returns: A presigned URL string.

get_object_metadata
get_object_metadata(remote_path: str) -> Dict[str, Any]

Get metadata for a blob in the bucket. Args: remote_path: Path in the bucket. Returns: Dictionary of metadata for the blob, including name, size, content_type, timestamps, and custom metadata.

s3

S3StorageHandler
S3StorageHandler(
    bucket_name: str,
    *,
    endpoint: str,
    access_key: str,
    secret_key: str,
    secure: bool = True,
    ensure_bucket: bool = True,
    create_if_missing: bool = True,
    region: Optional[str] = None
)

Bases: StorageHandler

A thin wrapper around boto3 S3 APIs for S3-compatible storage.

Works with AWS S3, Minio, DigitalOcean Spaces, and other S3-compatible services. Uses boto3 with IfNoneMatch='*' for atomic conditional writes.

Initialize an S3StorageHandler.

Parameters:

Name Type Description Default
bucket_name str

Name of the S3 bucket.

required
endpoint str

S3-compatible server endpoint (e.g., "localhost:9000", "s3.amazonaws.com").

required
access_key str

Access key for authentication.

required
secret_key str

Secret key for authentication.

required
secure bool

Whether to use HTTPS (default True).

True
ensure_bucket bool

If True, check bucket exists on init.

True
create_if_missing bool

If True, create the bucket if it does not exist.

True
region Optional[str]

Optional region for bucket creation.

None
upload
upload(
    local_path: str,
    remote_path: str,
    metadata: Optional[Dict[str, str]] = None,
    fail_if_exists: bool = False,
) -> FileResult

Upload a file to S3.

Parameters:

Name Type Description Default
local_path str

Path to the local file to upload.

required
remote_path str

Path in the bucket to upload to (key only, no s3:// prefix).

required
metadata Optional[Dict[str, str]]

Optional metadata to associate with the object.

None
fail_if_exists bool

If True, return ALREADY_EXISTS status if object exists. Uses S3 IfNoneMatch='*' for atomic create-only semantics.

False

Returns:

Name Type Description
FileResult

FileResult with status OK, ALREADY_EXISTS, or ERROR.

Note FileResult

remote_path in result is the key (not full s3:// URI) for use with delete().

download
download(
    remote_path: str, local_path: str, skip_if_exists: bool = False
) -> FileResult

Download a file from S3 to a local path.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to download from.

required
local_path str

Local path to save the file.

required
skip_if_exists bool

If True, skip download if local_path exists.

False

Returns:

Type Description
FileResult

FileResult with status OK, SKIPPED, NOT_FOUND, or ERROR.

delete
delete(remote_path: str) -> FileResult

Delete a file from S3.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to delete.

required

Returns:

Type Description
FileResult

FileResult with status OK, NOT_FOUND, or ERROR.

upload_string
upload_string(
    content: str | bytes,
    remote_path: str,
    content_type: str = "application/json",
    fail_if_exists: bool = False,
    if_generation_match: int | None = None,
) -> StringResult

Upload string/bytes content directly to S3 without temp files.

Parameters:

Name Type Description Default
content str | bytes

String or bytes content to upload.

required
remote_path str

Path in the bucket to upload to (key only, no s3:// prefix).

required
content_type str

MIME type of the content.

'application/json'
fail_if_exists bool

If True, fail if the object already exists.

False
if_generation_match int | None

If 0, uses IfNoneMatch='*' for atomic create-only. This matches GCS semantics where generation=0 means "only if not exists".

None

Returns:

Name Type Description
StringResult

StringResult with status OK, ALREADY_EXISTS, or ERROR.

Note StringResult

remote_path in result is the key (not full s3:// URI) for use with delete().

download_string
download_string(remote_path: str) -> StringResult

Download object content as bytes without temp files.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to download from.

required

Returns:

Type Description
StringResult

StringResult with status OK, NOT_FOUND, or ERROR, and content if OK.

list_objects
list_objects(
    *, prefix: str = "", max_results: Optional[int] = None
) -> List[str]

List objects in the bucket with an optional prefix and limit.

Parameters:

Name Type Description Default
prefix str

Only list objects with this prefix.

''
max_results Optional[int]

Maximum number of results to return.

None

Returns:

Type Description
List[str]

List of object names (paths) in the bucket.

exists
exists(remote_path: str) -> bool

Check if an object exists in the bucket.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket to check.

required

Returns:

Type Description
bool

True if the object exists, False otherwise.

get_presigned_url
get_presigned_url(
    remote_path: str, *, expiration_minutes: int = 60, method: str = "GET"
) -> str

Get a presigned URL for an object in the bucket.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket.

required
expiration_minutes int

Minutes until the URL expires.

60
method str

HTTP method for the URL (e.g., 'GET', 'PUT').

'GET'

Returns:

Type Description
str

A presigned URL string.

get_object_metadata
get_object_metadata(remote_path: str) -> Dict[str, Any]

Get metadata for an object in the bucket.

Parameters:

Name Type Description Default
remote_path str

Path in the bucket.

required

Returns:

Type Description
Dict[str, Any]

Dictionary of metadata for the object.