Skip to main content

Structured Datasets (DataFrames)

Structured Datasets (DataFrames)

Structured Datasets, often referred to as DataFrames, are a fundamental data type for analytical and machine learning workflows. Flyte provides robust support for handling various DataFrame libraries, abstracting away the complexities of data serialization, deserialization, and storage across different environments.

The DataFrame Wrapper

The DataFrame class (src.flyte.io._dataframe.dataframe.DataFrame) serves as the primary user-facing interface for structured datasets within Flyte. It acts as a wrapper around actual DataFrame objects (e.g., Pandas DataFrames, PyArrow Tables) and their associated metadata, such as storage location (URI) and file format. This abstraction allows Flyte to manage the lifecycle of structured data, enabling seamless data exchange between tasks and persistent storage.

Key properties and methods of the DataFrame wrapper include:

  • uri: An optional string representing the storage location of the dataset (e.g., s3://my-bucket/data.parquet).
  • file_format: An optional string indicating the data's format (e.g., "parquet", "csv"). Defaults to a generic format if not specified.
  • val: An optional property that holds the in-memory Python DataFrame object (e.g., pd.DataFrame, pa.Table) when the data has been loaded.
  • metadata: Contains literals_pb2.StructuredDatasetMetadata, which includes schema information (StructuredDatasetType).
  • literal: The underlying Flyte IDL literals_pb2.StructuredDataset object, used internally for Flyte's type system.

To interact with the underlying DataFrame object within a task, use the following methods:

  • open(dataframe_type: Type[DF]) -> DataFrame: This method specifies the desired local Python DataFrame type (e.g., pandas.DataFrame, pyarrow.Table) for loading the dataset. It's crucial for lazy loading, as it ensures the appropriate handler is available when the data is finally accessed.
    import pandas as pd
    from flyte.io._dataframe.dataframe import DataFrame

    @task
    def process_data(df_input: DataFrame) -> DataFrame:
    # Specify that we want to work with a Pandas DataFrame
    pandas_df = df_input.open(pd.DataFrame).all()
    # Perform operations on pandas_df
    processed_df = pandas_df.head()
    return DataFrame(val=processed_df) # Wrap the result back
  • all() -> DF: Asynchronously loads the entire dataset into the Python DataFrame type specified by open(). This method is typically used when the entire dataset needs to be in memory for processing.
  • iter() -> typing.AsyncIterator[DF]: Asynchronously iterates over the dataset. This is useful for large datasets that cannot fit entirely into memory, allowing for chunked processing.

The DataFrame wrapper handles its own serialization and deserialization to and from Flyte's internal literal representation. This process is managed by the DataFrameTransformerEngine, ensuring that the uri and file_format are correctly preserved and translated.

Data Transformation Engine

The DataFrameTransformerEngine (src.flyte.io._dataframe.dataframe.DataFrameTransformerEngine) is a specialized type transformer responsible for managing the conversion of Python DataFrame objects to Flyte Literal objects and vice-versa. It acts as a central registry for various DataFrame handlers, enabling Flyte to support a wide range of DataFrame libraries and storage formats.

The engine's core responsibilities include:

  • Handler Management: It maintains registries (ENCODERS, DECODERS) for DataFrameEncoder and DataFrameDecoder instances.
  • Handler Selection: When a DataFrame needs to be serialized or deserialized, the engine selects the appropriate handler based on the Python DataFrame type, the storage protocol (e.g., "s3", "gs", "fsspec"), and the desired file format (e.g., "parquet", "csv"). The _finder method implements this selection logic, including fallback mechanisms for generic formats.
  • Type Conversion:
    • to_literal(python_val, python_type, expected): Converts a Python DataFrame value (either a raw DataFrame object or a DataFrame wrapper) into a Flyte literals_pb2.Literal. This involves:
      • Handling DataFrame instances that are passthroughs or merely specify a URI.
      • Invoking the appropriate DataFrameEncoder for actual DataFrame objects.
      • Determining the storage protocol from the DataFrame type's default or the execution context's raw data path.
    • to_python_value(lv, expected_python_type): Converts a Flyte literals_pb2.Literal back into a Python DataFrame object. This involves:
      • Extracting column schema information from the literal and the expected Python type.
      • Invoking the appropriate DataFrameDecoder to load the data.
      • Implementing column subsetting logic, where the decoder should load only the columns specified in the task's input annotation.

Developers can register custom encoders and decoders with the DataFrameTransformerEngine using the register method. This method allows specifying defaults for a given Python type, overriding existing registrations, and setting default formats or storage protocols.

Encoding and Decoding DataFrames

Flyte uses abstract interfaces, DataFrameEncoder and DataFrameDecoder, to define how Python DataFrame objects are converted to and from Flyte's internal StructuredDataset literal representation.

  • DataFrameEncoder (src.flyte.io._dataframe.dataframe.DataFrameEncoder):

    • This abstract class defines the interface for converting a Python DataFrame into a Flyte literals_pb2.StructuredDataset.
    • Implementations must provide an encode method that takes a DataFrame wrapper (containing the Python DataFrame) and a types_pb2.StructuredDatasetType (containing schema information) and returns a literals_pb2.StructuredDataset.
    • Encoders are registered with a specific python_type (e.g., pandas.DataFrame), an optional protocol (e.g., "s3"), and a supported_format (e.g., "parquet").
  • DataFrameDecoder (src.flyte.io._dataframe.dataframe.DataFrameDecoder):

    • This abstract class defines the interface for converting a Flyte literals_pb2.StructuredDataset into a Python DataFrame.
    • Implementations must provide a decode method that takes a literals_pb2.StructuredDataset and literals_pb2.StructuredDatasetMetadata (containing the expected schema for the current task) and returns either a Python DataFrame instance or an AsyncIterator of DataFrames.
    • Decoders are registered similarly to encoders, specifying the python_type, protocol, and supported_format.

These interfaces allow for flexible integration of various DataFrame libraries and storage backends.

Built-in DataFrame Handlers

Flyte provides several built-in handlers for common DataFrame libraries and file formats, located in src.flyte.io._dataframe.basic_dfs. These handlers are automatically registered with the DataFrameTransformerEngine and support seamless conversion between Pandas DataFrames, PyArrow Tables, and Parquet/CSV formats.

  • PandasToParquetEncodingHandler: Converts a pandas.DataFrame to a Parquet file.
  • ParquetToPandasDecodingHandler: Reads a Parquet file into a pandas.DataFrame.
  • PandasToCSVEncodingHandler: Converts a pandas.DataFrame to a CSV file.
  • CSVToPandasDecodingHandler: Reads a CSV file into a pandas.DataFrame.
  • ArrowToParquetEncodingHandler: Converts a pyarrow.Table to a Parquet file.
  • ParquetToArrowDecodingHandler: Reads a Parquet file into a pyarrow.Table.

These handlers manage the underlying storage operations, including writing to and reading from remote storage (like S3) or local filesystems, often leveraging fsspec for protocol handling. They also handle potential credential issues by attempting anonymous access for S3 sources if initial attempts fail.

Defining DataFrame Schemas with Type Annotations

Flyte allows developers to define the expected schema of a DataFrame using type annotations, providing strong typing and enabling column subsetting. This is achieved by annotating the DataFrame type with a dictionary specifying column names and their Python types.

For example:

from typing import Annotated
import pandas as pd
from flyte.io._dataframe.dataframe import DataFrame

# Define a custom schema for a DataFrame
MyDataFrameSchema = Annotated[
DataFrame,
{"col_a": int, "col_b": float, "col_c": str},
"parquet" # Optional: specify default format
]

@task
def produce_data() -> MyDataFrameSchema:
df = pd.DataFrame({"col_a": [1, 2], "col_b": [1.1, 2.2], "col_c": ["x", "y"]})
return DataFrame(val=df)

@task
def consume_subset(input_df: Annotated[DataFrame, {"col_b": float}]) -> float:
# When input_df is opened, only 'col_b' will be loaded
df = input_df.open(pd.DataFrame).all()
return df["col_b"].sum()

@workflow
def my_workflow():
produced_df = produce_data()
result = consume_subset(input_df=produced_df)
# ...

When a task receives a DataFrame input with a specified column schema, the DataFrameTransformerEngine ensures that the decode method of the chosen handler receives this schema information. Decoders are expected to use this metadata (current_task_metadata.structured_dataset_type.columns) to load only the requested columns, optimizing data transfer and memory usage.

Common Use Cases and Best Practices

  • Returning Python DataFrames: When a task returns a pandas.DataFrame or pyarrow.Table directly, Flyte automatically wraps it in a DataFrame and uses the registered encoders to serialize it to the default format (e.g., Parquet) and storage location.
    import pandas as pd
    @task
    def generate_report() -> pd.DataFrame:
    df = pd.DataFrame({"metric": [10, 20], "value": [100, 200]})
    return df # Automatically wrapped and serialized
  • Explicitly Managing DataFrames: For more control, or when dealing with external URIs, explicitly create DataFrame instances.
    from flyte.io._dataframe.dataframe import DataFrame
    @task
    def load_external_data(uri: str) -> DataFrame:
    return DataFrame(uri=uri, file_format="csv")
  • Lazy Loading for Performance: Always use df.open(DesiredType).all() or df.open(DesiredType).iter() inside the task where the data is needed. This ensures data is only downloaded and deserialized when explicitly requested, improving performance and resource utilization.
  • Custom Handlers: For unsupported DataFrame libraries (e.g., Dask, Spark DataFrames) or custom file formats, implement DataFrameEncoder and DataFrameDecoder and register them with the DataFrameTransformerEngine. This extends Flyte's capabilities to your specific data ecosystem.

Limitations and Considerations

  • Wrapper vs. Data: The DataFrame class is a wrapper. Direct manipulation of its uri or file_format properties does not alter the underlying data until a new DataFrame instance is created or the data is re-encoded.
  • Column Subsetting: While Flyte passes column schema information to decoders, it is the responsibility of the decoder implementation to respect and apply this subsetting logic when loading data.
  • Protocol Resolution: The DataFrameTransformerEngine attempts to infer the storage protocol (e.g., "s3", "file") from the URI or the execution context. Ensure URIs are well-formed or that default protocols are correctly configured for your environment.
  • Error Handling: Handlers include logic to gracefully handle common storage errors, such as NoCredentialsError for S3, by attempting anonymous access. However, robust error handling for specific storage backends should be considered in custom handler implementations.