Skip to main content

Storage Configuration and Management

This section describes how to configure and manage data storage within the system, supporting various cloud providers and offering a unified approach to remote file system interactions. The system leverages fsspec for flexible filesystem abstraction and provides specific configurations for Amazon S3, Google Cloud Storage (GCS), and Azure Blob File Storage (ABFS).

Core Storage Configuration

The Storage class provides a base for common storage settings applicable across all supported providers. These settings control fundamental aspects of data interaction, such as reliability and debugging.

Common Configuration Parameters:

  • retries (int, default: 3): The number of times to retry failed storage operations.
  • backoff (datetime.timedelta, default: 5 seconds): The initial delay before retrying an operation. This is used in a backoff strategy.
  • enable_debug (bool, default: False): Enables debug logging for storage operations.
  • attach_execution_metadata (bool, default: True): Controls whether execution-specific metadata is attached to storage operations.

Automatic Configuration from Environment Variables:

The Storage class and its derived classes can automatically configure themselves by reading environment variables. This simplifies deployment and ensures consistent settings across environments.

ParameterEnvironment Variable
retriesUNION_STORAGE_RETRIES
backoffUNION_STORAGE_BACKOFF_SECONDS
enable_debugUNION_STORAGE_DEBUG

To automatically load these settings:

import os
import datetime
from src.flyte.storage._config import Storage

# Example: Set environment variables
os.environ["UNION_STORAGE_RETRIES"] = "5"
os.environ["UNION_STORAGE_BACKOFF_SECONDS"] = "10"
os.environ["UNION_STORAGE_DEBUG"] = "true"

# Automatically configure storage
storage_config = Storage.auto()

print(f"Retries: {storage_config.retries}")
print(f"Backoff: {storage_config.backoff}")
print(f"Debug Enabled: {storage_config.enable_debug}")

Provider-Specific Storage Configurations

The system provides dedicated configuration classes for major cloud storage providers, inheriting common settings from Storage and adding provider-specific parameters.

Amazon S3 Configuration

The S3 class manages configuration for Amazon S3. It supports specifying custom endpoints, access keys, and secret keys.

S3-Specific Parameters:

  • endpoint (str, optional): Custom S3 endpoint URL (e.g., for MinIO or local testing).
  • access_key_id (str, optional): AWS access key ID.
  • secret_access_key (str, optional): AWS secret access key.

Environment Variables for S3:

ParameterEnvironment Variable
endpointFLYTE_AWS_ENDPOINT
access_key_idFLYTE_AWS_ACCESS_KEY_ID
secret_access_keyFLYTE_AWS_SECRET_ACCESS_KEY

Sandbox Configuration:

For local development and testing, the S3.for_sandbox() method provides a pre-configured setup for a local S3-compatible service like MinIO.

from src.flyte.storage._config import S3

# Automatically configure S3 from environment variables
s3_config = S3.auto()

# Configure S3 for a local sandbox environment (e.g., MinIO)
sandbox_s3_config = S3.for_sandbox()
print(f"Sandbox S3 Endpoint: {sandbox_s3_config.endpoint}")

Google Cloud Storage (GCS) Configuration

The GCS class handles configuration specific to Google Cloud Storage.

GCS-Specific Parameters:

  • gsutil_parallelism (bool, default: False): Enables parallel operations using gsutil.

Environment Variables for GCS:

ParameterEnvironment Variable
gsutil_parallelismGCP_GSUTIL_PARALLELISM
from src.flyte.storage._config import GCS

# Automatically configure GCS from environment variables
gcs_config = GCS.auto()

Azure Blob File Storage (ABFS) Configuration

The ABFS class provides configuration for Azure Blob Storage.

ABFS-Specific Parameters:

  • account_name (str, optional): Azure Storage account name.
  • account_key (str, optional): Azure Storage account key.
  • tenant_id (str, optional): Azure Active Directory tenant ID.
  • client_id (str, optional): Azure Active Directory client ID.
  • client_secret (str, optional): Azure Active Directory client secret.

Environment Variables for ABFS:

ParameterEnvironment Variable
account_nameAZURE_STORAGE_ACCOUNT_NAME
account_keyAZURE_STORAGE_ACCOUNT_KEY
tenant_idAZURE_TENANT_ID
client_idAZURE_CLIENT_ID
client_secretAZURE_CLIENT_SECRET
from src.flyte.storage._config import ABFS

# Automatically configure ABFS from environment variables
abfs_config = ABFS.auto()

Integrating with fsspec

Each storage configuration class provides a get_fsspec_kwargs method. This method translates the configured settings into a dictionary of keyword arguments suitable for constructing an fsspec filesystem instance. This allows for seamless integration with fsspec-compatible libraries and tools.

The anonymous parameter, when set to True, configures the fsspec filesystem for anonymous access, bypassing credential requirements where supported.

Example for S3:

from src.flyte.storage._config import S3
import fsspec

s3_config = S3.for_sandbox() # Or S3.auto() for production
fsspec_args = s3_config.get_fsspec_kwargs(anonymous=False)

# Example of how these kwargs would be used with fsspec
# fs = fsspec.filesystem("s3", **fsspec_args)
# print(f"fsspec S3 arguments: {fsspec_args}")

# Example with anonymous access
anonymous_fsspec_args = s3_config.get_fsspec_kwargs(anonymous=True)
# fs_anon = fsspec.filesystem("s3", **anonymous_fsspec_args)
# print(f"fsspec S3 anonymous arguments: {anonymous_fsspec_args}")

The get_fsspec_kwargs method also handles specific client options and retry configurations for robust remote interactions. For S3, it includes a detailed retry_config with exponential backoff.

Flyte URI Resolution

The RemoteFSPathResolver provides a mechanism to map internal flyte:// URIs to their actual remote storage paths (e.g., s3://bucket/path, gs://bucket/path). This abstraction allows the system to refer to data using a consistent internal protocol, decoupling it from the underlying storage provider.

  • protocol: The standard flyte:// prefix used for internal URIs.
  • resolve_remote_path(flyte_uri: str): Retrieves the corresponding remote path for a given flyte:// URI if a mapping exists in the current session. Returns None if no mapping is found.
  • add_mapping(flyte_uri: str, remote_path: str): Establishes a new mapping between a flyte:// URI and its remote path. This method is thread-safe.

Usage Example:

from src.flyte.storage._remote_fs import RemoteFSPathResolver

flyte_data_uri = "flyte://my_project/my_workflow/output_data"
s3_remote_path = "s3://my-s3-bucket/project/workflow/output_data.parquet"

# Add a mapping
RemoteFSPathResolver.add_mapping(flyte_data_uri, s3_remote_path)

# Resolve the remote path
resolved_path = RemoteFSPathResolver.resolve_remote_path(flyte_data_uri)
print(f"Resolved path for '{flyte_data_uri}': {resolved_path}")

# Attempt to resolve an unknown URI
unknown_path = RemoteFSPathResolver.resolve_remote_path("flyte://unknown/path")
print(f"Resolved path for 'flyte://unknown/path': {unknown_path}")

This resolution mechanism is crucial for maintaining portability and allowing the system to operate independently of specific cloud provider URLs during execution.

Best Practices and Considerations

  • Environment Variables for Configuration: Prefer using environment variables for configuring storage settings, especially in production environments. This promotes Twelve-Factor App principles and keeps sensitive credentials out of code.
  • fsspec Compatibility: The get_fsspec_kwargs method is the primary integration point for interacting with remote storage using fsspec. Ensure any custom fsspec operations leverage these generated arguments for consistent behavior.
  • Security: When configuring credentials (access keys, client secrets), use secure methods like environment variables or secrets management systems. Avoid hardcoding them directly in your application code.
  • Retry Logic: The built-in retry mechanism helps improve the robustness of storage operations against transient network issues or service unavailability. Adjust retries and backoff as needed for your specific use case and network conditions.
  • Flyte URI Abstraction: Leverage the flyte:// URI scheme for internal data references to ensure your workflows remain portable across different storage backends. The RemoteFSPathResolver handles the translation to actual cloud paths.
  • Debugging: Utilize the enable_debug flag to gain more insight into storage operations when troubleshooting connectivity or access issues.