Storage Configuration and Management
This section describes how to configure and manage data storage within the system, supporting various cloud providers and offering a unified approach to remote file system interactions. The system leverages fsspec for flexible filesystem abstraction and provides specific configurations for Amazon S3, Google Cloud Storage (GCS), and Azure Blob File Storage (ABFS).
Core Storage Configuration
The Storage class provides a base for common storage settings applicable across all supported providers. These settings control fundamental aspects of data interaction, such as reliability and debugging.
Common Configuration Parameters:
retries(int, default: 3): The number of times to retry failed storage operations.backoff(datetime.timedelta, default: 5 seconds): The initial delay before retrying an operation. This is used in a backoff strategy.enable_debug(bool, default: False): Enables debug logging for storage operations.attach_execution_metadata(bool, default: True): Controls whether execution-specific metadata is attached to storage operations.
Automatic Configuration from Environment Variables:
The Storage class and its derived classes can automatically configure themselves by reading environment variables. This simplifies deployment and ensures consistent settings across environments.
| Parameter | Environment Variable |
|---|---|
retries | UNION_STORAGE_RETRIES |
backoff | UNION_STORAGE_BACKOFF_SECONDS |
enable_debug | UNION_STORAGE_DEBUG |
To automatically load these settings:
import os
import datetime
from src.flyte.storage._config import Storage
# Example: Set environment variables
os.environ["UNION_STORAGE_RETRIES"] = "5"
os.environ["UNION_STORAGE_BACKOFF_SECONDS"] = "10"
os.environ["UNION_STORAGE_DEBUG"] = "true"
# Automatically configure storage
storage_config = Storage.auto()
print(f"Retries: {storage_config.retries}")
print(f"Backoff: {storage_config.backoff}")
print(f"Debug Enabled: {storage_config.enable_debug}")
Provider-Specific Storage Configurations
The system provides dedicated configuration classes for major cloud storage providers, inheriting common settings from Storage and adding provider-specific parameters.
Amazon S3 Configuration
The S3 class manages configuration for Amazon S3. It supports specifying custom endpoints, access keys, and secret keys.
S3-Specific Parameters:
endpoint(str, optional): Custom S3 endpoint URL (e.g., for MinIO or local testing).access_key_id(str, optional): AWS access key ID.secret_access_key(str, optional): AWS secret access key.
Environment Variables for S3:
| Parameter | Environment Variable |
|---|---|
endpoint | FLYTE_AWS_ENDPOINT |
access_key_id | FLYTE_AWS_ACCESS_KEY_ID |
secret_access_key | FLYTE_AWS_SECRET_ACCESS_KEY |
Sandbox Configuration:
For local development and testing, the S3.for_sandbox() method provides a pre-configured setup for a local S3-compatible service like MinIO.
from src.flyte.storage._config import S3
# Automatically configure S3 from environment variables
s3_config = S3.auto()
# Configure S3 for a local sandbox environment (e.g., MinIO)
sandbox_s3_config = S3.for_sandbox()
print(f"Sandbox S3 Endpoint: {sandbox_s3_config.endpoint}")
Google Cloud Storage (GCS) Configuration
The GCS class handles configuration specific to Google Cloud Storage.
GCS-Specific Parameters:
gsutil_parallelism(bool, default: False): Enables parallel operations usinggsutil.
Environment Variables for GCS:
| Parameter | Environment Variable |
|---|---|
gsutil_parallelism | GCP_GSUTIL_PARALLELISM |
from src.flyte.storage._config import GCS
# Automatically configure GCS from environment variables
gcs_config = GCS.auto()
Azure Blob File Storage (ABFS) Configuration
The ABFS class provides configuration for Azure Blob Storage.
ABFS-Specific Parameters:
account_name(str, optional): Azure Storage account name.account_key(str, optional): Azure Storage account key.tenant_id(str, optional): Azure Active Directory tenant ID.client_id(str, optional): Azure Active Directory client ID.client_secret(str, optional): Azure Active Directory client secret.
Environment Variables for ABFS:
| Parameter | Environment Variable |
|---|---|
account_name | AZURE_STORAGE_ACCOUNT_NAME |
account_key | AZURE_STORAGE_ACCOUNT_KEY |
tenant_id | AZURE_TENANT_ID |
client_id | AZURE_CLIENT_ID |
client_secret | AZURE_CLIENT_SECRET |
from src.flyte.storage._config import ABFS
# Automatically configure ABFS from environment variables
abfs_config = ABFS.auto()
Integrating with fsspec
Each storage configuration class provides a get_fsspec_kwargs method. This method translates the configured settings into a dictionary of keyword arguments suitable for constructing an fsspec filesystem instance. This allows for seamless integration with fsspec-compatible libraries and tools.
The anonymous parameter, when set to True, configures the fsspec filesystem for anonymous access, bypassing credential requirements where supported.
Example for S3:
from src.flyte.storage._config import S3
import fsspec
s3_config = S3.for_sandbox() # Or S3.auto() for production
fsspec_args = s3_config.get_fsspec_kwargs(anonymous=False)
# Example of how these kwargs would be used with fsspec
# fs = fsspec.filesystem("s3", **fsspec_args)
# print(f"fsspec S3 arguments: {fsspec_args}")
# Example with anonymous access
anonymous_fsspec_args = s3_config.get_fsspec_kwargs(anonymous=True)
# fs_anon = fsspec.filesystem("s3", **anonymous_fsspec_args)
# print(f"fsspec S3 anonymous arguments: {anonymous_fsspec_args}")
The get_fsspec_kwargs method also handles specific client options and retry configurations for robust remote interactions. For S3, it includes a detailed retry_config with exponential backoff.
Flyte URI Resolution
The RemoteFSPathResolver provides a mechanism to map internal flyte:// URIs to their actual remote storage paths (e.g., s3://bucket/path, gs://bucket/path). This abstraction allows the system to refer to data using a consistent internal protocol, decoupling it from the underlying storage provider.
protocol: The standardflyte://prefix used for internal URIs.resolve_remote_path(flyte_uri: str): Retrieves the corresponding remote path for a givenflyte://URI if a mapping exists in the current session. ReturnsNoneif no mapping is found.add_mapping(flyte_uri: str, remote_path: str): Establishes a new mapping between aflyte://URI and its remote path. This method is thread-safe.
Usage Example:
from src.flyte.storage._remote_fs import RemoteFSPathResolver
flyte_data_uri = "flyte://my_project/my_workflow/output_data"
s3_remote_path = "s3://my-s3-bucket/project/workflow/output_data.parquet"
# Add a mapping
RemoteFSPathResolver.add_mapping(flyte_data_uri, s3_remote_path)
# Resolve the remote path
resolved_path = RemoteFSPathResolver.resolve_remote_path(flyte_data_uri)
print(f"Resolved path for '{flyte_data_uri}': {resolved_path}")
# Attempt to resolve an unknown URI
unknown_path = RemoteFSPathResolver.resolve_remote_path("flyte://unknown/path")
print(f"Resolved path for 'flyte://unknown/path': {unknown_path}")
This resolution mechanism is crucial for maintaining portability and allowing the system to operate independently of specific cloud provider URLs during execution.
Best Practices and Considerations
- Environment Variables for Configuration: Prefer using environment variables for configuring storage settings, especially in production environments. This promotes Twelve-Factor App principles and keeps sensitive credentials out of code.
fsspecCompatibility: Theget_fsspec_kwargsmethod is the primary integration point for interacting with remote storage usingfsspec. Ensure any customfsspecoperations leverage these generated arguments for consistent behavior.- Security: When configuring credentials (access keys, client secrets), use secure methods like environment variables or secrets management systems. Avoid hardcoding them directly in your application code.
- Retry Logic: The built-in retry mechanism helps improve the robustness of storage operations against transient network issues or service unavailability. Adjust
retriesandbackoffas needed for your specific use case and network conditions. - Flyte URI Abstraction: Leverage the
flyte://URI scheme for internal data references to ensure your workflows remain portable across different storage backends. TheRemoteFSPathResolverhandles the translation to actual cloud paths. - Debugging: Utilize the
enable_debugflag to gain more insight into storage operations when troubleshooting connectivity or access issues.