File and Directory Types
File and Directory Types
The platform provides robust abstractions for managing files and directories, whether they reside on local filesystems or remote object storage (e.g., S3, GCS). These types facilitate seamless data handling within tasks and workflows, supporting both asynchronous and synchronous operations. Additionally, a specialized DataFrame type offers advanced capabilities for structured datasets, integrating with various data processing libraries.
The File Type
The File type represents a single file, abstracting away the underlying storage mechanism. It supports generic typing (File[T]) to indicate the expected format or content of the file, though users are responsible for parsing the content.
Key Capabilities:
- Path Management: A
Fileinstance is primarily defined by itspath, which can be a local filesystem path or a remote URI (e.g.,s3://bucket/data.csv). - Asynchronous and Synchronous I/O: The
Filetype offers bothasyncandsyncinterfaces for file operations, allowing developers to choose the most suitable approach for their application.open(mode: str = "rb", ...): Asynchronously opens the file, returning anasyncfile-like object. This is the primary method for reading from or writing to the file's content.open_sync(mode: str = "rb", ...): Synchronously opens the file, returning a standard file-like object.
- File Existence:
exists_sync(): Checks if the file exists synchronously.
- Data Transfer:
download(local_path: Optional[Union[str, Path]] = None): Asynchronously downloads the file to a specified local path. If nolocal_pathis provided, a temporary location is used.from_local(local_path: Union[str, Path], remote_destination: Optional[str] = None, hash_method: Optional[HashMethod | str] = None): Asynchronously uploads a local file to a remote store, returning a newFileinstance referencing the remote location.
- Remote File References:
new_remote(hash_method: Optional[HashMethod | str] = None): Creates aFilereference for a new remote file that will be written to. This is useful for streaming data directly to remote storage.from_existing_remote(remote_path: str, file_cache_key: Optional[str] = None): Creates aFilereference to an already existing remote file.
Hashing for Caching and Integrity:
The File type incorporates hashing mechanisms to support caching and verify data integrity.
hash: An optional string representing a precomputed hash value for the file's content.hash_method: An optionalHashMethodinstance (or string name) used to compute the hash during I/O operations.HashlibAccumulator: Wraps standardhashlibalgorithms (e.g., SHA256) to accumulate hash during reads/writes.PrecomputedValue: Allows specifying a hash value directly, bypassing computation.
HashingReaderandAsyncHashingReader: Wraps file handles to update a hash accumulator during read operations.HashingWriterandAsyncHashingWriter: Wraps file handles to update a hash accumulator during write operations.
When hash_method is provided during an open() or from_local() operation, the system can compute the file's hash, which can then be used for task caching.
Integration with the Type System:
The FileTransformer handles the serialization and deserialization of File objects to and from the platform's literal types. It maps File to a BlobType with SINGLE dimensionality, indicating a single file.
Example Usage:
from pandas import DataFrame
from flyte.io._file import File
import pandas as pd
# Async usage: Read a remote CSV file
async def read_remote_csv():
csv_file = File[DataFrame](path="s3://my-bucket/data.csv")
async with csv_file.open() as f:
df = pd.read_csv(f)
print(f"Read DataFrame from {csv_file.path}")
# Sync usage: Write a local DataFrame to a new remote file
async def write_to_remote_csv():
df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
remote_file = File.new_remote() # Generates a random remote path
async with remote_file.open("wb") as f:
df.to_csv(f, index=False)
print(f"Written DataFrame to {remote_file.path}")
# Async usage: Upload a local file
async def upload_local_file():
local_path = "/tmp/my_local_data.txt"
with open(local_path, "w") as f:
f.write("Hello, Flyte!")
remote_file = await File[str].from_local(local_path, "s3://my-bucket/uploaded_data.txt")
print(f"Uploaded local file to {remote_file.path}")
The Dir Type
The Dir type represents a directory containing multiple files, similar to File but for collections of data. It also supports generic typing (Dir[T]) to specify the format of files within the directory.
Key Capabilities:
- Path Management: Like
File,Diruses apathattribute for local or remote directory locations. - Asynchronous and Synchronous Directory Operations:
walk(recursive: bool = True, max_depth: Optional[int] = None): Asynchronously iterates through files within the directory, yieldingFileobjects. Supports recursive traversal.walk_sync(recursive: bool = True, file_pattern: str = "*", max_depth: Optional[int] = None): Synchronous version ofwalk.list_files(): Asynchronously retrieves a non-recursive list ofFileobjects in the directory.list_files_sync(): Synchronous version oflist_files.
- Directory Existence:
exists(): Asynchronously checks if the directory exists.exists_sync(): Synchronously checks if the directory exists.
- File Access within Directory:
get_file(file_name: str): Asynchronously retrieves a specificFileobject by name from the directory.get_file_sync(file_name: str): Synchronous version ofget_file.
- Data Transfer:
download(local_path: Optional[Union[str, Path]] = None): Asynchronously downloads the entire directory to a local path.download_sync(local_path: Optional[Union[str, Path]] = None): Synchronously downloads the entire directory to a local path. Note: Synchronous download for remote paths is currently not implemented and will raise aNotImplementedError.from_local(local_path: Union[str, Path], remote_path: Optional[str] = None, dir_cache_key: Optional[str] = None): Asynchronously uploads a local directory to a remote store.from_local_sync(local_path: Union[str, Path], remote_path: Optional[str] = None): Synchronously uploads a local directory to a remote store. Note: Synchronous upload for remote paths is currently not implemented and will raise aNotImplementedError.
- Remote Directory References:
from_existing_remote(remote_path: str, dir_cache_key: Optional[str] = None): Creates aDirreference to an already existing remote directory.
Integration with the Type System:
The DirTransformer handles the serialization and deserialization of Dir objects. It maps Dir to a BlobType with MULTIPART dimensionality, indicating a collection of files.
Example Usage:
from pandas import DataFrame
from flyte.io._dir import Dir
import pandas as pd
import os
# Async usage: Walk through files in a remote directory
async def process_remote_dir():
data_dir = Dir[DataFrame](path="s3://my-bucket/data_collection/")
async for file in data_dir.walk():
print(f"Found file: {file.path}")
# Example: download and read
local_file_path = await file.download()
with open(local_file_path, "r") as f:
content = f.read()
print(f"Content of {local_file_path}: {content[:50]}...")
os.remove(local_file_path) # Clean up
# Sync usage: List files in a local directory
def list_local_dir_sync():
local_dir_path = "/tmp/my_local_dir"
os.makedirs(local_dir_path, exist_ok=True)
with open(os.path.join(local_dir_path, "file1.txt"), "w") as f: f.write("content1")
with open(os.path.join(local_dir_path, "file2.txt"), "w") as f: f.write("content2")
local_dir = Dir[str](path=local_dir_path)
files = local_dir.list_files_sync()
for file in files:
print(f"Local file: {file.path}")
# Clean up
os.remove(os.path.join(local_dir_path, "file1.txt"))
os.remove(os.path.join(local_dir_path, "file2.txt"))
os.rmdir(local_dir_path)
The DataFrame Type (Structured Datasets)
The DataFrame type provides a high-level, user-facing abstraction for structured datasets, such as those represented by Pandas DataFrames or PyArrow Tables. It decouples the logical representation of a dataset from its physical storage format and underlying library implementation.
Key Attributes:
uri: The storage location of the dataset (local path or remote URI).file_format: The format of the stored data (e.g., "csv", "parquet").
Core Capabilities:
- Lazy Loading:
DataFrameinstances are typically lightweight references to data. The actual data is loaded only when explicitly requested. - Flexible Data Access:
open(dataframe_type: Type[DF]): Prepares theDataFrameinstance to load data into a specific Python DataFrame type (e.g.,pandas.DataFrame,pyarrow.Table).all(): Asynchronously loads the entire dataset into the specifieddataframe_type.iter(): Asynchronously iterates over the dataset, yielding chunks of data in the specifieddataframe_type. This is useful for large datasets that cannot fit into memory.
- Serialization/Deserialization:
DataFrameobjects are automatically serialized and deserialized by theDataFrameTransformerEngine.
The DataFrameTransformerEngine:
This is a meta-transformer responsible for handling the conversion between Python DataFrame objects (like pandas.DataFrame or pyarrow.Table) and the platform's StructuredDataset literal type. It manages a registry of DataFrameEncoder and DataFrameDecoder implementations for various DataFrame libraries, storage protocols, and file formats.
DataFrameEncoder: An abstract base class for converting a Python DataFrame object into aStructuredDatasetliteral. Implementations define how a specific DataFrame type is written to a given storage protocol and format (e.g.,PandasToCSVEncodingHandler,PandasToParquetEncodingHandler,ArrowToParquetEncodingHandler).DataFrameDecoder: An abstract base class for converting aStructuredDatasetliteral back into a Python DataFrame object. Implementations define how data from a specific storage protocol and format is read into a DataFrame type (e.g.,CSVToPandasDecodingHandler,ParquetToPandasDecodingHandler,ParquetToArrowDecodingHandler).- Registration: Developers can register custom
DataFrameEncoderandDataFrameDecoderimplementations with theDataFrameTransformerEngineto extend support for new DataFrame libraries, storage systems, or file formats. This allows for highly extensible data handling. - Column Subsetting and Schema: The engine supports column subsetting, where a task can request only a subset of columns from an input
DataFrame. It also handles schema information, including external schema types like Apache Arrow.
Integration with the Type System:
The DataFrameTransformerEngine maps the DataFrame type to a StructuredDatasetType literal. This literal type can include column definitions, format information, and external schema details.
Example Usage:
from flyte.io._dataframe.dataframe import DataFrame
from flyte.io._dataframe.basic_dfs import PandasToCSVEncodingHandler, CSVToPandasDecodingHandler
import pandas as pd
from typing import Annotated
# Register handlers (typically done automatically by the library)
DataFrameTransformerEngine.register(PandasToCSVEncodingHandler())
DataFrameTransformerEngine.register(CSVToPandasDecodingHandler())
# Define a task that takes and returns a DataFrame
# @env.task
async def process_dataframe(input_df: Annotated[DataFrame, {"col1": int}]) -> DataFrame:
# Load the input DataFrame as a pandas DataFrame
df = await input_df.open(pd.DataFrame).all()
print(f"Input DataFrame columns: {df.columns.tolist()}")
# Perform some operations
df["new_col"] = df["col1"] * 2
# Create a new DataFrame instance for output
output_df = DataFrame(val=df, file_format="csv")
return output_df
# Example of creating a DataFrame from a local pandas DataFrame
async def create_and_process_dataframe():
local_df = pd.DataFrame({"col1": [10, 20], "col2": ["x", "y"]})
# When passing a plain pandas DataFrame, the engine wraps it in the DataFrame type
# and uses registered encoders to upload it.
# Simulate task execution (in a real workflow, this would be handled by the platform)
# For local execution, you might explicitly create a DataFrame instance with the value
input_flyte_df = DataFrame(val=local_df, file_format="csv")
# Call the task-like function
result_flyte_df = await process_dataframe(input_flyte_df)
# Load the result back as a pandas DataFrame
result_pd_df = await result_flyte_df.open(pd.DataFrame).all()
print(f"Result DataFrame:\n{result_pd_df}")
# Example of referencing an existing remote DataFrame
# @env.task
async def read_existing_remote_dataframe() -> DataFrame:
# Reference an existing remote CSV file
return DataFrame(uri="s3://my-bucket/existing_data.csv", file_format="csv")
Asynchronous and Synchronous Operations
Both File and Dir types provide a dual API for asynchronous (async def) and synchronous (def) operations.
- Asynchronous operations (e.g.,
file.open(),dir.walk(),file.download()) are generally preferred for I/O-bound tasks, especially when interacting with remote storage, as they allow other operations to proceed while waiting for I/O to complete. This can significantly improve the overall efficiency of tasks. - Synchronous operations (e.g.,
file.open_sync(),dir.walk_sync(),file.exists_sync()) are available for simpler scripts or when integrating with existing synchronous codebases. However, for remote I/O, synchronous operations might block the execution thread, potentially leading to performance bottlenecks.
Important Consideration: For Dir, download_sync and from_local_sync are not fully implemented for remote paths and will raise NotImplementedError. Developers should use their asynchronous counterparts for remote directory transfers.
Key Considerations
- User Responsibility for I/O: For
FileandDirtypes, users are explicitly responsible for handling the actual I/O operations (reading/writing content) using the providedopen()oropen_sync()methods. The type transformers for these types do not perform automatic uploading or downloading of file contents; they only manage the metadata (path, format, hash). - Generic Type
T: The generic typeTinFile[T]andDir[T]serves as a hint for the expected content format. It does not enforce type checking on the content itself but can be used by type transformers or user code to guide parsing and processing. - Remote vs. Local Paths: The types seamlessly handle both local filesystem paths and remote URIs (e.g.,
s3://,gs://). The underlying storage layer automatically determines the appropriate filesystem driver. - Performance: Asynchronous operations are crucial for performance when dealing with remote storage. Leveraging
async/awaitpatterns can prevent blocking and improve resource utilization. - Extensibility for DataFrames: The
DataFrameTransformerEngineprovides a powerful extension point for integrating custom DataFrame libraries or specialized data formats. Developers can implement and register their ownDataFrameEncoderandDataFrameDecoderto support new data types.