Structured Datasets (DataFrames)
Structured Datasets (DataFrames)
Structured Datasets, often referred to as DataFrames, are a fundamental data type for analytical and machine learning workflows. Flyte provides robust support for handling various DataFrame libraries, abstracting away the complexities of data serialization, deserialization, and storage across different environments.
The DataFrame Wrapper
The DataFrame class (src.flyte.io._dataframe.dataframe.DataFrame) serves as the primary user-facing interface for structured datasets within Flyte. It acts as a wrapper around actual DataFrame objects (e.g., Pandas DataFrames, PyArrow Tables) and their associated metadata, such as storage location (URI) and file format. This abstraction allows Flyte to manage the lifecycle of structured data, enabling seamless data exchange between tasks and persistent storage.
Key properties and methods of the DataFrame wrapper include:
uri: An optional string representing the storage location of the dataset (e.g.,s3://my-bucket/data.parquet).file_format: An optional string indicating the data's format (e.g., "parquet", "csv"). Defaults to a generic format if not specified.val: An optional property that holds the in-memory Python DataFrame object (e.g.,pd.DataFrame,pa.Table) when the data has been loaded.metadata: Containsliterals_pb2.StructuredDatasetMetadata, which includes schema information (StructuredDatasetType).literal: The underlying Flyte IDLliterals_pb2.StructuredDatasetobject, used internally for Flyte's type system.
To interact with the underlying DataFrame object within a task, use the following methods:
open(dataframe_type: Type[DF]) -> DataFrame: This method specifies the desired local Python DataFrame type (e.g.,pandas.DataFrame,pyarrow.Table) for loading the dataset. It's crucial for lazy loading, as it ensures the appropriate handler is available when the data is finally accessed.import pandas as pd
from flyte.io._dataframe.dataframe import DataFrame
@task
def process_data(df_input: DataFrame) -> DataFrame:
# Specify that we want to work with a Pandas DataFrame
pandas_df = df_input.open(pd.DataFrame).all()
# Perform operations on pandas_df
processed_df = pandas_df.head()
return DataFrame(val=processed_df) # Wrap the result backall() -> DF: Asynchronously loads the entire dataset into the Python DataFrame type specified byopen(). This method is typically used when the entire dataset needs to be in memory for processing.iter() -> typing.AsyncIterator[DF]: Asynchronously iterates over the dataset. This is useful for large datasets that cannot fit entirely into memory, allowing for chunked processing.
The DataFrame wrapper handles its own serialization and deserialization to and from Flyte's internal literal representation. This process is managed by the DataFrameTransformerEngine, ensuring that the uri and file_format are correctly preserved and translated.
Data Transformation Engine
The DataFrameTransformerEngine (src.flyte.io._dataframe.dataframe.DataFrameTransformerEngine) is a specialized type transformer responsible for managing the conversion of Python DataFrame objects to Flyte Literal objects and vice-versa. It acts as a central registry for various DataFrame handlers, enabling Flyte to support a wide range of DataFrame libraries and storage formats.
The engine's core responsibilities include:
- Handler Management: It maintains registries (
ENCODERS,DECODERS) forDataFrameEncoderandDataFrameDecoderinstances. - Handler Selection: When a DataFrame needs to be serialized or deserialized, the engine selects the appropriate handler based on the Python DataFrame type, the storage protocol (e.g., "s3", "gs", "fsspec"), and the desired file format (e.g., "parquet", "csv"). The
_findermethod implements this selection logic, including fallback mechanisms for generic formats. - Type Conversion:
to_literal(python_val, python_type, expected): Converts a Python DataFrame value (either a raw DataFrame object or aDataFramewrapper) into a Flyteliterals_pb2.Literal. This involves:- Handling
DataFrameinstances that are passthroughs or merely specify a URI. - Invoking the appropriate
DataFrameEncoderfor actual DataFrame objects. - Determining the storage protocol from the DataFrame type's default or the execution context's raw data path.
- Handling
to_python_value(lv, expected_python_type): Converts a Flyteliterals_pb2.Literalback into a Python DataFrame object. This involves:- Extracting column schema information from the literal and the expected Python type.
- Invoking the appropriate
DataFrameDecoderto load the data. - Implementing column subsetting logic, where the decoder should load only the columns specified in the task's input annotation.
Developers can register custom encoders and decoders with the DataFrameTransformerEngine using the register method. This method allows specifying defaults for a given Python type, overriding existing registrations, and setting default formats or storage protocols.
Encoding and Decoding DataFrames
Flyte uses abstract interfaces, DataFrameEncoder and DataFrameDecoder, to define how Python DataFrame objects are converted to and from Flyte's internal StructuredDataset literal representation.
-
DataFrameEncoder(src.flyte.io._dataframe.dataframe.DataFrameEncoder):- This abstract class defines the interface for converting a Python DataFrame into a Flyte
literals_pb2.StructuredDataset. - Implementations must provide an
encodemethod that takes aDataFramewrapper (containing the Python DataFrame) and atypes_pb2.StructuredDatasetType(containing schema information) and returns aliterals_pb2.StructuredDataset. - Encoders are registered with a specific
python_type(e.g.,pandas.DataFrame), an optionalprotocol(e.g., "s3"), and asupported_format(e.g., "parquet").
- This abstract class defines the interface for converting a Python DataFrame into a Flyte
-
DataFrameDecoder(src.flyte.io._dataframe.dataframe.DataFrameDecoder):- This abstract class defines the interface for converting a Flyte
literals_pb2.StructuredDatasetinto a Python DataFrame. - Implementations must provide a
decodemethod that takes aliterals_pb2.StructuredDatasetandliterals_pb2.StructuredDatasetMetadata(containing the expected schema for the current task) and returns either a Python DataFrame instance or anAsyncIteratorof DataFrames. - Decoders are registered similarly to encoders, specifying the
python_type,protocol, andsupported_format.
- This abstract class defines the interface for converting a Flyte
These interfaces allow for flexible integration of various DataFrame libraries and storage backends.
Built-in DataFrame Handlers
Flyte provides several built-in handlers for common DataFrame libraries and file formats, located in src.flyte.io._dataframe.basic_dfs. These handlers are automatically registered with the DataFrameTransformerEngine and support seamless conversion between Pandas DataFrames, PyArrow Tables, and Parquet/CSV formats.
PandasToParquetEncodingHandler: Converts apandas.DataFrameto a Parquet file.ParquetToPandasDecodingHandler: Reads a Parquet file into apandas.DataFrame.PandasToCSVEncodingHandler: Converts apandas.DataFrameto a CSV file.CSVToPandasDecodingHandler: Reads a CSV file into apandas.DataFrame.ArrowToParquetEncodingHandler: Converts apyarrow.Tableto a Parquet file.ParquetToArrowDecodingHandler: Reads a Parquet file into apyarrow.Table.
These handlers manage the underlying storage operations, including writing to and reading from remote storage (like S3) or local filesystems, often leveraging fsspec for protocol handling. They also handle potential credential issues by attempting anonymous access for S3 sources if initial attempts fail.
Defining DataFrame Schemas with Type Annotations
Flyte allows developers to define the expected schema of a DataFrame using type annotations, providing strong typing and enabling column subsetting. This is achieved by annotating the DataFrame type with a dictionary specifying column names and their Python types.
For example:
from typing import Annotated
import pandas as pd
from flyte.io._dataframe.dataframe import DataFrame
# Define a custom schema for a DataFrame
MyDataFrameSchema = Annotated[
DataFrame,
{"col_a": int, "col_b": float, "col_c": str},
"parquet" # Optional: specify default format
]
@task
def produce_data() -> MyDataFrameSchema:
df = pd.DataFrame({"col_a": [1, 2], "col_b": [1.1, 2.2], "col_c": ["x", "y"]})
return DataFrame(val=df)
@task
def consume_subset(input_df: Annotated[DataFrame, {"col_b": float}]) -> float:
# When input_df is opened, only 'col_b' will be loaded
df = input_df.open(pd.DataFrame).all()
return df["col_b"].sum()
@workflow
def my_workflow():
produced_df = produce_data()
result = consume_subset(input_df=produced_df)
# ...
When a task receives a DataFrame input with a specified column schema, the DataFrameTransformerEngine ensures that the decode method of the chosen handler receives this schema information. Decoders are expected to use this metadata (current_task_metadata.structured_dataset_type.columns) to load only the requested columns, optimizing data transfer and memory usage.
Common Use Cases and Best Practices
- Returning Python DataFrames: When a task returns a
pandas.DataFrameorpyarrow.Tabledirectly, Flyte automatically wraps it in aDataFrameand uses the registered encoders to serialize it to the default format (e.g., Parquet) and storage location.import pandas as pd
@task
def generate_report() -> pd.DataFrame:
df = pd.DataFrame({"metric": [10, 20], "value": [100, 200]})
return df # Automatically wrapped and serialized - Explicitly Managing DataFrames: For more control, or when dealing with external URIs, explicitly create
DataFrameinstances.from flyte.io._dataframe.dataframe import DataFrame
@task
def load_external_data(uri: str) -> DataFrame:
return DataFrame(uri=uri, file_format="csv") - Lazy Loading for Performance: Always use
df.open(DesiredType).all()ordf.open(DesiredType).iter()inside the task where the data is needed. This ensures data is only downloaded and deserialized when explicitly requested, improving performance and resource utilization. - Custom Handlers: For unsupported DataFrame libraries (e.g., Dask, Spark DataFrames) or custom file formats, implement
DataFrameEncoderandDataFrameDecoderand register them with theDataFrameTransformerEngine. This extends Flyte's capabilities to your specific data ecosystem.
Limitations and Considerations
- Wrapper vs. Data: The
DataFrameclass is a wrapper. Direct manipulation of itsuriorfile_formatproperties does not alter the underlying data until a newDataFrameinstance is created or the data is re-encoded. - Column Subsetting: While Flyte passes column schema information to decoders, it is the responsibility of the decoder implementation to respect and apply this subsetting logic when loading data.
- Protocol Resolution: The
DataFrameTransformerEngineattempts to infer the storage protocol (e.g., "s3", "file") from the URI or the execution context. Ensure URIs are well-formed or that default protocols are correctly configured for your environment. - Error Handling: Handlers include logic to gracefully handle common storage errors, such as
NoCredentialsErrorfor S3, by attempting anonymous access. However, robust error handling for specific storage backends should be considered in custom handler implementations.