Skip to main content

Data Types and I/O

The platform's type system provides robust mechanisms for handling diverse data types and managing I/O operations, abstracting away the complexities of underlying storage systems. This enables developers to define task inputs and outputs using native Python types while ensuring efficient and reliable data transfer between tasks, whether data resides locally or in remote blob storage.

File and Directory Handling

The platform offers dedicated types for managing single files and collections of files within directories, providing both asynchronous and synchronous interfaces for I/O operations.

The File Type

The File type represents a single file, which can be located either locally or in remote storage (e.g., S3, GCS). It is a generic type, allowing you to specify the expected format of the file using a type parameter T. This generic type helps the platform understand the file's content, though users are responsible for the actual I/O operations.

Key Capabilities:

  • Generic Format Specification: Declare the expected content format, for example, File[pandas.DataFrame] for a CSV or Parquet file that will be read into a Pandas DataFrame.
  • Asynchronous and Synchronous I/O: Provides open() (async context manager) and open_sync() (sync context manager) for reading from or writing to the file.
  • Remote and Local Path Abstraction: The path attribute can be a local file path or a remote URI (e.g., s3://bucket/data.csv). The underlying storage layer handles the specifics.
  • File Existence Checks: Use exists_sync() to verify if the file exists at its specified path.
  • Downloading: The download() method asynchronously fetches a remote file to a local path.

Creating File Instances:

  • From an existing remote path:
    from pandas import DataFrame
    from flyte.io import File

    # Reference an existing remote file
    remote_csv = File[DataFrame].from_existing_remote("s3://my-bucket/data.csv")
  • For a new remote file to be written:
    from flyte.io import File

    # Create a new remote file reference for writing
    new_remote_file = File.new_remote()
  • From a local file (which will be uploaded):
    import pandas as pd
    from flyte.io import File

    # Create a local file
    local_df = pd.DataFrame({"col1": [1, 2], "col2": ["A", "B"]})
    local_df.to_csv("/tmp/local_data.csv", index=False)

    # Create a File object from a local path, which will be uploaded
    uploaded_file = await File[pd.DataFrame].from_local("/tmp/local_data.csv", "s3://my-bucket/uploaded_data.csv")

I/O Operations within Tasks:

The File type is commonly used as an input or output for tasks, allowing the platform to manage the file's lifecycle.

  • Reading a file input in a task:
    import pandas as pd
    from flyte.io import File
    from flyte.tasks import task

    @task
    async def process_csv(file: File[pd.DataFrame]) -> int:
    async with file.open("rb") as f:
    df = pd.read_csv(f)
    return len(df)
  • Writing a file by streaming directly to blob storage:
    import pandas as pd
    from flyte.io import File
    from flyte.tasks import task

    @task
    async def generate_csv() -> File[pd.DataFrame]:
    df = pd.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
    output_file = File.new_remote()
    async with output_file.open("wb") as f:
    df.to_csv(f, index=False)
    return output_file
  • Writing a file locally first, then uploading:
    import pandas as pd
    from flyte.io import File
    from flyte.tasks import task

    @task
    async def generate_and_upload_csv() -> File[pd.DataFrame]:
    # Simulate writing to a local temporary file
    local_path = "/tmp/temp_output.csv"
    df = pd.DataFrame({"data": [10, 20, 30]})
    df.to_csv(local_path, index=False)

    # The platform will upload this local file to remote storage
    return await File[pd.DataFrame].from_local(local_path, "s3://my-bucket/final_output.csv")

The Dir Type

The Dir type represents a directory containing multiple files, similar to File, it supports generic types for the format of files within the directory. It provides methods for traversing the directory and performing bulk operations.

Key Capabilities:

  • Generic File Format: Dir[T] indicates that the directory contains files of a specific format T.
  • Asynchronous and Synchronous Traversal: walk() (async iterator) and walk_sync() (sync iterator) allow iterating over all files within the directory, optionally recursively.
  • Listing Files: list_files() (async) and list_files_sync() (sync) retrieve a non-recursive list of files.
  • Downloading Directories: The download() method asynchronously fetches an entire remote directory to a local path.
  • Directory Existence Checks: Use exists() (async) or exists_sync() (sync) to verify if the directory exists.
  • Accessing Individual Files: get_file() (async) and get_file_sync() (sync) retrieve a File object for a specific file within the directory.

Creating Dir Instances:

  • From an existing remote path:
    from pandas import DataFrame
    from flyte.io import Dir

    # Reference an existing remote directory
    remote_data_dir = Dir[DataFrame].from_existing_remote("s3://my-bucket/data_archive/")
  • From a local directory (which will be uploaded):
    import os
    from pathlib import Path
    from flyte.io import Dir
    from flyte.tasks import task

    @task
    async def upload_local_dir() -> Dir[str]: # Dir of generic string files
    local_dir_path = "/tmp/my_local_data"
    Path(local_dir_path).mkdir(parents=True, exist_ok=True)
    with open(os.path.join(local_dir_path, "file1.txt"), "w") as f:
    f.write("content 1")
    with open(os.path.join(local_dir_path, "file2.txt"), "w") as f:
    f.write("content 2")

    # Upload the local directory to a remote path
    return await Dir[str].from_local(local_dir_path, "s3://my-bucket/uploaded_dir/")

I/O Operations within Tasks:

  • Walking through files in a directory:
    from pandas import DataFrame
    from flyte.io import Dir
    from flyte.tasks import task

    @task
    async def process_directory(data_dir: Dir[DataFrame]) -> int:
    total_rows = 0
    async for file in data_dir.walk():
    async with file.open("rb") as f:
    df = pd.read_csv(f)
    total_rows += len(df)
    return total_rows
  • Downloading a directory:
    from flyte.io import Dir
    from flyte.tasks import task

    @task
    async def analyze_local_data(remote_dir: Dir[str]) -> str:
    local_path = await remote_dir.download("/tmp/downloaded_data")
    # Perform local analysis on files in local_path
    return f"Data downloaded to {local_path}"

Structured Data with DataFrame

The DataFrame type provides a flexible way to represent structured datasets, such as tabular data, without tightly coupling to a specific Python library (e.g., Pandas, Apache Arrow). It acts as a handle or reference to the data, which can be stored remotely.

Key Concepts:

  • Data Reference: A DataFrame instance primarily stores a uri (path to the data) and a file_format (e.g., "parquet", "csv"). It does not directly contain the data itself.
  • Lazy Loading: Data is loaded into a specific Python DataFrame object (e.g., pandas.DataFrame) only when explicitly requested within a task.
  • Extensible Handlers: The platform uses a DataFrameTransformerEngine to manage various encoders and decoders. These handlers are responsible for reading/writing data from/to the specified uri and file_format into/from the desired Python DataFrame type.

Interacting with DataFrame:

  1. Creating a DataFrame Reference: You can create a DataFrame instance by providing its URI and format.

    from flyte.io import DataFrame

    # Reference a Parquet file
    my_dataframe_ref = DataFrame(uri="s3://my-bucket/data.parquet", file_format="parquet")
  2. Loading Data into a Python DataFrame Object: Within a task, you use the open() method to specify the target Python DataFrame type (e.g., pandas.DataFrame) and then all() to load the entire dataset.

    import pandas as pd
    from flyte.io import DataFrame
    from flyte.tasks import task

    @task
    async def process_dataframe(df_ref: DataFrame) -> int:
    # Load the data into a pandas DataFrame
    df: pd.DataFrame = await df_ref.open(pd.DataFrame).all()
    return len(df)
  3. Returning a DataFrame from a Task: When a task returns a DataFrame, the platform's type system automatically handles the serialization and storage of the data.

    import pandas as pd
    from flyte.io import DataFrame
    from flyte.tasks import task

    @task
    async def create_and_return_dataframe() -> DataFrame:
    df = pd.DataFrame({"col_a": [1, 2], "col_b": ["x", "y"]})
    # The platform will automatically save this DataFrame to a remote URI
    # and return a DataFrame reference.
    return DataFrame(val=df, file_format="parquet")

Custom DataFrame Handlers:

The DataFrameTransformerEngine allows developers to register custom encoders and decoders for new DataFrame types or specific file formats. This enables seamless integration of specialized data structures into the platform's type system.

The Type System: TypeEngine and TypeTransformer

At the core of the platform's data handling capabilities is a powerful and extensible type system, built around the TypeEngine and TypeTransformer components. This system ensures that Python objects can be seamlessly converted to and from the platform's internal literal representations for execution and storage.

TypeEngine Overview

The TypeEngine is the central registry and orchestrator for all type conversions. Its primary responsibilities include:

  • Python to LiteralType Conversion: Translating Python type hints (e.g., int, str, list[str], File[DataFrame]) into the platform's LiteralType definitions, which describe the data's structure and format.
  • Python Value to Literal Conversion: Serializing actual Python objects (e.g., 1, "hello", an instance of File) into the platform's Literal values for transmission between tasks and storage.
  • Literal to Python Value Conversion: Deserializing Literal values back into Python objects when they are consumed as task inputs.
  • Type Inference: Guessing the appropriate Python type from a given LiteralType.
  • Transformer Management: Maintaining a registry of TypeTransformer instances and selecting the correct transformer for any given Python type.

TypeTransformer Basics

A TypeTransformer is an abstract base class that defines the interface for handling conversions for a specific Python type. Each transformer is responsible for:

  • get_literal_type(python_type): Defining how a Python type hint maps to a LiteralType.
  • to_literal(python_val, python_type, expected_literal_type): Converting a Python value to a Literal. This is the serialization step.
  • to_python_value(literal_value, expected_python_type): Converting a Literal back to a Python value. This is the deserialization step.
  • guess_python_type(literal_type): Inferring the Python type from a LiteralType.
  • to_html(python_val, expected_python_type): Providing an HTML representation of the Python value, often used for UI display.

Developers can extend the platform's type system by implementing custom TypeTransformer classes and registering them with the TypeEngine. This allows the platform to understand and manage any custom Python object.

Built-in Type Transformers

The platform includes several pre-built TypeTransformer implementations for common Python data types, ensuring out-of-the-box compatibility and efficient handling.

Dataclasses

The DataclassTransformer handles Python dataclasses.

  • Serialization: Dataclass instances are serialized to MessagePack bytes, which are then encapsulated in a Binary scalar literal. This provides an efficient and compact binary representation.
  • Deserialization: MessagePack bytes are deserialized back into dataclass instances.
  • Schema Extraction: The transformer attempts to extract a JSON schema for the dataclass, which can be used for validation and documentation in the platform's UI.

Example:

import dataclasses
from flyte.tasks import task

@dataclasses.dataclass
class MyData:
name: str
value: int

@task
def process_data(data: MyData) -> MyData:
print(f"Processing {data.name} with value {data.value}")
return MyData(name=data.name.upper(), value=data.value * 2)

Dictionaries

The DictTransformer handles Python dictionaries. Its behavior depends on whether the dictionary is typed or untyped.

  • Typed Dictionaries (Dict[str, T]): If the dictionary has string keys and a specified value type (e.g., Dict[str, int]), it is mapped to a LiteralMap. Each key-value pair is converted using the appropriate transformer for the value type.
  • Untyped Dictionaries (dict or Dict[Any, Any]): Untyped dictionaries are serialized to MessagePack bytes and stored as a Binary scalar literal, similar to dataclasses.
  • Key Constraint: For LiteralMap conversions, all dictionary keys must be strings.

Example:

from typing import Dict
from flyte.tasks import task

@task
def process_typed_dict(data: Dict[str, int]) -> Dict[str, str]:
return {k: str(v * 10) for k, v in data.items()}

@task
def process_untyped_dict(data: dict) -> dict:
# This will be serialized as MessagePack
data["processed"] = True
return data

Lists

The ListTransformer handles Python lists with a specified element type (List[T]).

  • Serialization: A Python list is mapped to a LiteralCollection. Each element in the list is individually converted to a Literal using the transformer for its element type.
  • Deserialization: A LiteralCollection is converted back into a Python list, with each Literal element deserialized.

Example:

from typing import List
from flyte.tasks import task

@task
def sum_list(numbers: List[int]) -> int:
return sum(numbers)

Unions and Optional Types

The UnionTransformer handles typing.Union and typing.Optional types. Optional[T] is internally represented as Union[T, None].

  • Type Matching: When serializing a value, the transformer attempts to match the value's runtime type to one of the types specified in the Union.
  • Ambiguity Handling: If a value could match multiple types within a Union (e.g., a dictionary that could be interpreted as two different dataclasses with overlapping fields), an error is raised to prevent ambiguous conversions.

Example:

from typing import Union, Optional
from flyte.tasks import task

@task
def handle_union(value: Union[str, int]) -> str:
if isinstance(value, str):
return f"String: {value.upper()}"
return f"Integer: {value * 10}"

@task
def handle_optional(name: Optional[str]) -> str:
return name if name else "Anonymous"

Pydantic Models

The PydanticTransformer provides support for Pydantic BaseModel objects.

  • Serialization: Pydantic models are serialized to JSON, then converted to MessagePack bytes, and finally encapsulated in a Binary scalar literal.
  • Deserialization: MessagePack bytes are deserialized back into Pydantic model instances.
  • Schema Extraction: The transformer extracts the JSON schema from the Pydantic model, which is valuable for validation and UI representation.

Example:

from pydantic import BaseModel
from flyte.tasks import task

class User(BaseModel):
id: int
name: str
email: Optional[str] = None

@task
def register_user(user_data: User) -> User:
print(f"Registering user: {user_data.name}")
# Simulate some processing
user_data.name = user_data.name.title()
return user_data

Protobuf Messages

The ProtobufTransformer integrates google.protobuf.message.Message objects.

  • Serialization: Protobuf messages are converted into the platform's Struct literal type, which is a flexible, schema-less key-value store. ListValue messages are converted to LiteralCollection.
  • Deserialization: Struct literals are converted back into Protobuf message instances.

Example:

from google.protobuf import struct_pb2
from flyte.tasks import task

@task
def process_protobuf_struct(data: struct_pb2.Struct) -> struct_pb2.Struct:
print(f"Received Protobuf Struct: {data}")
data["status"] = "processed"
return data

Pickle (Fallback)

The FlytePickleTransformer serves as a fallback mechanism for any Python object that does not have a dedicated TypeTransformer.

  • Serialization: Any Python object is serialized using Python's pickle module and stored as a remote file. A Blob literal with PythonPickle format is used to reference this file.
  • Deserialization: The remote pickle file is downloaded and deserialized back into a Python object.
  • Considerations: While convenient, using pickle has performance implications (especially for large objects) and is not interoperable with non-Python environments. It should be used as a last resort when a more specific and efficient transformer is not available or feasible.

Task Signatures with NativeInterface

The NativeInterface class represents the input and output signature of a Python function or task. It plays a crucial role in how the platform understands and executes your code.

Key Aspects:

  • Automatic Inference: The platform automatically infers the NativeInterface from your Python function's signature, including parameter names, type hints, and default values.
  • Input/Output Mapping: It defines the mapping between Python function arguments/return values and the platform's VariableMap for inputs and outputs.
  • Remote Defaults: It can handle cases where task inputs have default values that are known remotely (e.g., a default File URI).

Example:

from typing import Optional
from flyte.io import File
from flyte.tasks import task

@task
def my_task(
required_str: str,
optional_int: Optional[int] = 10,
input_file: File[str] = File.from_existing_remote("s3://default-bucket/default.txt")
) -> str:
return f"String: {required_str}, Int: {optional_int}, File: {input_file.path}"

# The NativeInterface for my_task would capture:
# Inputs:
# required_str: (str, inspect.Parameter.empty)
# optional_int: (Optional[int], 10)
# input_file: (File[str], File.from_existing_remote(...))
# Outputs:
# output_0: str

Best Practices and Considerations

  • Comprehensive Type Hinting: Always use explicit type hints for all task inputs and outputs. This is fundamental for the platform's type system to correctly serialize, deserialize, and validate data, enabling features like schema evolution and cross-language interoperability.
  • Asynchronous I/O: For tasks involving significant I/O (e.g., reading/writing large files), prefer the asynchronous open(), walk(), and download() methods provided by File and Dir. This allows for more efficient resource utilization and better performance in concurrent execution environments.
  • Data Locality: Understand that File and Dir objects are references to data locations. Data transfer (downloading or uploading) only occurs when you explicitly call methods like open(), download(), or when a File is created from_local(). Design your tasks to minimize unnecessary data transfers.
  • Efficient Structured Data Formats: When working with DataFrame types, choose efficient and interoperable formats like Parquet or Feather over less efficient ones like CSV, especially for large datasets. These formats often include schema information and support columnar storage, leading to better performance.
  • Custom Type Serialization: If you introduce custom Python classes (beyond standard dataclasses or Pydantic models) as task inputs or outputs, ensure they are properly serializable. Consider inheriting from mashumaro.types.SerializableType and implementing _serialize and _deserialize methods, or registering a custom TypeTransformer.
  • Avoid Pickle When Possible: While the FlytePickleTransformer provides a convenient fallback, it should be used sparingly. Pickled data is generally less efficient, not human-readable, and lacks interoperability with non-Python environments. Prioritize using types with dedicated transformers or defining custom types that serialize to open, language-agnostic formats (e.g., JSON, MessagePack, Parquet).
  • Schema Evolution: For structured data (dataclasses, Pydantic models, DataFrame with column definitions), consider how schema changes might impact downstream tasks. The platform's type system can help detect and manage these changes, but careful design is still necessary.