Data Types and I/O
The platform's type system provides robust mechanisms for handling diverse data types and managing I/O operations, abstracting away the complexities of underlying storage systems. This enables developers to define task inputs and outputs using native Python types while ensuring efficient and reliable data transfer between tasks, whether data resides locally or in remote blob storage.
File and Directory Handling
The platform offers dedicated types for managing single files and collections of files within directories, providing both asynchronous and synchronous interfaces for I/O operations.
The File Type
The File type represents a single file, which can be located either locally or in remote storage (e.g., S3, GCS). It is a generic type, allowing you to specify the expected format of the file using a type parameter T. This generic type helps the platform understand the file's content, though users are responsible for the actual I/O operations.
Key Capabilities:
- Generic Format Specification: Declare the expected content format, for example,
File[pandas.DataFrame]for a CSV or Parquet file that will be read into a Pandas DataFrame. - Asynchronous and Synchronous I/O: Provides
open()(async context manager) andopen_sync()(sync context manager) for reading from or writing to the file. - Remote and Local Path Abstraction: The
pathattribute can be a local file path or a remote URI (e.g.,s3://bucket/data.csv). The underlying storage layer handles the specifics. - File Existence Checks: Use
exists_sync()to verify if the file exists at its specified path. - Downloading: The
download()method asynchronously fetches a remote file to a local path.
Creating File Instances:
- From an existing remote path:
from pandas import DataFrame
from flyte.io import File
# Reference an existing remote file
remote_csv = File[DataFrame].from_existing_remote("s3://my-bucket/data.csv") - For a new remote file to be written:
from flyte.io import File
# Create a new remote file reference for writing
new_remote_file = File.new_remote() - From a local file (which will be uploaded):
import pandas as pd
from flyte.io import File
# Create a local file
local_df = pd.DataFrame({"col1": [1, 2], "col2": ["A", "B"]})
local_df.to_csv("/tmp/local_data.csv", index=False)
# Create a File object from a local path, which will be uploaded
uploaded_file = await File[pd.DataFrame].from_local("/tmp/local_data.csv", "s3://my-bucket/uploaded_data.csv")
I/O Operations within Tasks:
The File type is commonly used as an input or output for tasks, allowing the platform to manage the file's lifecycle.
- Reading a file input in a task:
import pandas as pd
from flyte.io import File
from flyte.tasks import task
@task
async def process_csv(file: File[pd.DataFrame]) -> int:
async with file.open("rb") as f:
df = pd.read_csv(f)
return len(df) - Writing a file by streaming directly to blob storage:
import pandas as pd
from flyte.io import File
from flyte.tasks import task
@task
async def generate_csv() -> File[pd.DataFrame]:
df = pd.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
output_file = File.new_remote()
async with output_file.open("wb") as f:
df.to_csv(f, index=False)
return output_file - Writing a file locally first, then uploading:
import pandas as pd
from flyte.io import File
from flyte.tasks import task
@task
async def generate_and_upload_csv() -> File[pd.DataFrame]:
# Simulate writing to a local temporary file
local_path = "/tmp/temp_output.csv"
df = pd.DataFrame({"data": [10, 20, 30]})
df.to_csv(local_path, index=False)
# The platform will upload this local file to remote storage
return await File[pd.DataFrame].from_local(local_path, "s3://my-bucket/final_output.csv")
The Dir Type
The Dir type represents a directory containing multiple files, similar to File, it supports generic types for the format of files within the directory. It provides methods for traversing the directory and performing bulk operations.
Key Capabilities:
- Generic File Format:
Dir[T]indicates that the directory contains files of a specific formatT. - Asynchronous and Synchronous Traversal:
walk()(async iterator) andwalk_sync()(sync iterator) allow iterating over all files within the directory, optionally recursively. - Listing Files:
list_files()(async) andlist_files_sync()(sync) retrieve a non-recursive list of files. - Downloading Directories: The
download()method asynchronously fetches an entire remote directory to a local path. - Directory Existence Checks: Use
exists()(async) orexists_sync()(sync) to verify if the directory exists. - Accessing Individual Files:
get_file()(async) andget_file_sync()(sync) retrieve aFileobject for a specific file within the directory.
Creating Dir Instances:
- From an existing remote path:
from pandas import DataFrame
from flyte.io import Dir
# Reference an existing remote directory
remote_data_dir = Dir[DataFrame].from_existing_remote("s3://my-bucket/data_archive/") - From a local directory (which will be uploaded):
import os
from pathlib import Path
from flyte.io import Dir
from flyte.tasks import task
@task
async def upload_local_dir() -> Dir[str]: # Dir of generic string files
local_dir_path = "/tmp/my_local_data"
Path(local_dir_path).mkdir(parents=True, exist_ok=True)
with open(os.path.join(local_dir_path, "file1.txt"), "w") as f:
f.write("content 1")
with open(os.path.join(local_dir_path, "file2.txt"), "w") as f:
f.write("content 2")
# Upload the local directory to a remote path
return await Dir[str].from_local(local_dir_path, "s3://my-bucket/uploaded_dir/")
I/O Operations within Tasks:
- Walking through files in a directory:
from pandas import DataFrame
from flyte.io import Dir
from flyte.tasks import task
@task
async def process_directory(data_dir: Dir[DataFrame]) -> int:
total_rows = 0
async for file in data_dir.walk():
async with file.open("rb") as f:
df = pd.read_csv(f)
total_rows += len(df)
return total_rows - Downloading a directory:
from flyte.io import Dir
from flyte.tasks import task
@task
async def analyze_local_data(remote_dir: Dir[str]) -> str:
local_path = await remote_dir.download("/tmp/downloaded_data")
# Perform local analysis on files in local_path
return f"Data downloaded to {local_path}"
Structured Data with DataFrame
The DataFrame type provides a flexible way to represent structured datasets, such as tabular data, without tightly coupling to a specific Python library (e.g., Pandas, Apache Arrow). It acts as a handle or reference to the data, which can be stored remotely.
Key Concepts:
- Data Reference: A
DataFrameinstance primarily stores auri(path to the data) and afile_format(e.g., "parquet", "csv"). It does not directly contain the data itself. - Lazy Loading: Data is loaded into a specific Python DataFrame object (e.g.,
pandas.DataFrame) only when explicitly requested within a task. - Extensible Handlers: The platform uses a
DataFrameTransformerEngineto manage various encoders and decoders. These handlers are responsible for reading/writing data from/to the specifieduriandfile_formatinto/from the desired Python DataFrame type.
Interacting with DataFrame:
-
Creating a
DataFrameReference: You can create aDataFrameinstance by providing its URI and format.from flyte.io import DataFrame
# Reference a Parquet file
my_dataframe_ref = DataFrame(uri="s3://my-bucket/data.parquet", file_format="parquet") -
Loading Data into a Python DataFrame Object: Within a task, you use the
open()method to specify the target Python DataFrame type (e.g.,pandas.DataFrame) and thenall()to load the entire dataset.import pandas as pd
from flyte.io import DataFrame
from flyte.tasks import task
@task
async def process_dataframe(df_ref: DataFrame) -> int:
# Load the data into a pandas DataFrame
df: pd.DataFrame = await df_ref.open(pd.DataFrame).all()
return len(df) -
Returning a
DataFramefrom a Task: When a task returns aDataFrame, the platform's type system automatically handles the serialization and storage of the data.import pandas as pd
from flyte.io import DataFrame
from flyte.tasks import task
@task
async def create_and_return_dataframe() -> DataFrame:
df = pd.DataFrame({"col_a": [1, 2], "col_b": ["x", "y"]})
# The platform will automatically save this DataFrame to a remote URI
# and return a DataFrame reference.
return DataFrame(val=df, file_format="parquet")
Custom DataFrame Handlers:
The DataFrameTransformerEngine allows developers to register custom encoders and decoders for new DataFrame types or specific file formats. This enables seamless integration of specialized data structures into the platform's type system.
The Type System: TypeEngine and TypeTransformer
At the core of the platform's data handling capabilities is a powerful and extensible type system, built around the TypeEngine and TypeTransformer components. This system ensures that Python objects can be seamlessly converted to and from the platform's internal literal representations for execution and storage.
TypeEngine Overview
The TypeEngine is the central registry and orchestrator for all type conversions. Its primary responsibilities include:
- Python to LiteralType Conversion: Translating Python type hints (e.g.,
int,str,list[str],File[DataFrame]) into the platform'sLiteralTypedefinitions, which describe the data's structure and format. - Python Value to Literal Conversion: Serializing actual Python objects (e.g.,
1,"hello", an instance ofFile) into the platform'sLiteralvalues for transmission between tasks and storage. - Literal to Python Value Conversion: Deserializing
Literalvalues back into Python objects when they are consumed as task inputs. - Type Inference: Guessing the appropriate Python type from a given
LiteralType. - Transformer Management: Maintaining a registry of
TypeTransformerinstances and selecting the correct transformer for any given Python type.
TypeTransformer Basics
A TypeTransformer is an abstract base class that defines the interface for handling conversions for a specific Python type. Each transformer is responsible for:
get_literal_type(python_type): Defining how a Python type hint maps to aLiteralType.to_literal(python_val, python_type, expected_literal_type): Converting a Python value to aLiteral. This is the serialization step.to_python_value(literal_value, expected_python_type): Converting aLiteralback to a Python value. This is the deserialization step.guess_python_type(literal_type): Inferring the Python type from aLiteralType.to_html(python_val, expected_python_type): Providing an HTML representation of the Python value, often used for UI display.
Developers can extend the platform's type system by implementing custom TypeTransformer classes and registering them with the TypeEngine. This allows the platform to understand and manage any custom Python object.
Built-in Type Transformers
The platform includes several pre-built TypeTransformer implementations for common Python data types, ensuring out-of-the-box compatibility and efficient handling.
Dataclasses
The DataclassTransformer handles Python dataclasses.
- Serialization: Dataclass instances are serialized to MessagePack bytes, which are then encapsulated in a
Binaryscalar literal. This provides an efficient and compact binary representation. - Deserialization: MessagePack bytes are deserialized back into dataclass instances.
- Schema Extraction: The transformer attempts to extract a JSON schema for the dataclass, which can be used for validation and documentation in the platform's UI.
Example:
import dataclasses
from flyte.tasks import task
@dataclasses.dataclass
class MyData:
name: str
value: int
@task
def process_data(data: MyData) -> MyData:
print(f"Processing {data.name} with value {data.value}")
return MyData(name=data.name.upper(), value=data.value * 2)
Dictionaries
The DictTransformer handles Python dictionaries. Its behavior depends on whether the dictionary is typed or untyped.
- Typed Dictionaries (
Dict[str, T]): If the dictionary has string keys and a specified value type (e.g.,Dict[str, int]), it is mapped to aLiteralMap. Each key-value pair is converted using the appropriate transformer for the value type. - Untyped Dictionaries (
dictorDict[Any, Any]): Untyped dictionaries are serialized to MessagePack bytes and stored as aBinaryscalar literal, similar to dataclasses. - Key Constraint: For
LiteralMapconversions, all dictionary keys must be strings.
Example:
from typing import Dict
from flyte.tasks import task
@task
def process_typed_dict(data: Dict[str, int]) -> Dict[str, str]:
return {k: str(v * 10) for k, v in data.items()}
@task
def process_untyped_dict(data: dict) -> dict:
# This will be serialized as MessagePack
data["processed"] = True
return data
Lists
The ListTransformer handles Python lists with a specified element type (List[T]).
- Serialization: A Python list is mapped to a
LiteralCollection. Each element in the list is individually converted to aLiteralusing the transformer for its element type. - Deserialization: A
LiteralCollectionis converted back into a Python list, with eachLiteralelement deserialized.
Example:
from typing import List
from flyte.tasks import task
@task
def sum_list(numbers: List[int]) -> int:
return sum(numbers)
Unions and Optional Types
The UnionTransformer handles typing.Union and typing.Optional types. Optional[T] is internally represented as Union[T, None].
- Type Matching: When serializing a value, the transformer attempts to match the value's runtime type to one of the types specified in the
Union. - Ambiguity Handling: If a value could match multiple types within a
Union(e.g., a dictionary that could be interpreted as two different dataclasses with overlapping fields), an error is raised to prevent ambiguous conversions.
Example:
from typing import Union, Optional
from flyte.tasks import task
@task
def handle_union(value: Union[str, int]) -> str:
if isinstance(value, str):
return f"String: {value.upper()}"
return f"Integer: {value * 10}"
@task
def handle_optional(name: Optional[str]) -> str:
return name if name else "Anonymous"
Pydantic Models
The PydanticTransformer provides support for Pydantic BaseModel objects.
- Serialization: Pydantic models are serialized to JSON, then converted to MessagePack bytes, and finally encapsulated in a
Binaryscalar literal. - Deserialization: MessagePack bytes are deserialized back into Pydantic model instances.
- Schema Extraction: The transformer extracts the JSON schema from the Pydantic model, which is valuable for validation and UI representation.
Example:
from pydantic import BaseModel
from flyte.tasks import task
class User(BaseModel):
id: int
name: str
email: Optional[str] = None
@task
def register_user(user_data: User) -> User:
print(f"Registering user: {user_data.name}")
# Simulate some processing
user_data.name = user_data.name.title()
return user_data
Protobuf Messages
The ProtobufTransformer integrates google.protobuf.message.Message objects.
- Serialization: Protobuf messages are converted into the platform's
Structliteral type, which is a flexible, schema-less key-value store.ListValuemessages are converted toLiteralCollection. - Deserialization:
Structliterals are converted back into Protobuf message instances.
Example:
from google.protobuf import struct_pb2
from flyte.tasks import task
@task
def process_protobuf_struct(data: struct_pb2.Struct) -> struct_pb2.Struct:
print(f"Received Protobuf Struct: {data}")
data["status"] = "processed"
return data
Pickle (Fallback)
The FlytePickleTransformer serves as a fallback mechanism for any Python object that does not have a dedicated TypeTransformer.
- Serialization: Any Python object is serialized using Python's
picklemodule and stored as a remote file. ABlobliteral withPythonPickleformat is used to reference this file. - Deserialization: The remote pickle file is downloaded and deserialized back into a Python object.
- Considerations: While convenient, using pickle has performance implications (especially for large objects) and is not interoperable with non-Python environments. It should be used as a last resort when a more specific and efficient transformer is not available or feasible.
Task Signatures with NativeInterface
The NativeInterface class represents the input and output signature of a Python function or task. It plays a crucial role in how the platform understands and executes your code.
Key Aspects:
- Automatic Inference: The platform automatically infers the
NativeInterfacefrom your Python function's signature, including parameter names, type hints, and default values. - Input/Output Mapping: It defines the mapping between Python function arguments/return values and the platform's
VariableMapfor inputs and outputs. - Remote Defaults: It can handle cases where task inputs have default values that are known remotely (e.g., a default
FileURI).
Example:
from typing import Optional
from flyte.io import File
from flyte.tasks import task
@task
def my_task(
required_str: str,
optional_int: Optional[int] = 10,
input_file: File[str] = File.from_existing_remote("s3://default-bucket/default.txt")
) -> str:
return f"String: {required_str}, Int: {optional_int}, File: {input_file.path}"
# The NativeInterface for my_task would capture:
# Inputs:
# required_str: (str, inspect.Parameter.empty)
# optional_int: (Optional[int], 10)
# input_file: (File[str], File.from_existing_remote(...))
# Outputs:
# output_0: str
Best Practices and Considerations
- Comprehensive Type Hinting: Always use explicit type hints for all task inputs and outputs. This is fundamental for the platform's type system to correctly serialize, deserialize, and validate data, enabling features like schema evolution and cross-language interoperability.
- Asynchronous I/O: For tasks involving significant I/O (e.g., reading/writing large files), prefer the asynchronous
open(),walk(), anddownload()methods provided byFileandDir. This allows for more efficient resource utilization and better performance in concurrent execution environments. - Data Locality: Understand that
FileandDirobjects are references to data locations. Data transfer (downloading or uploading) only occurs when you explicitly call methods likeopen(),download(), or when aFileis createdfrom_local(). Design your tasks to minimize unnecessary data transfers. - Efficient Structured Data Formats: When working with
DataFrametypes, choose efficient and interoperable formats like Parquet or Feather over less efficient ones like CSV, especially for large datasets. These formats often include schema information and support columnar storage, leading to better performance. - Custom Type Serialization: If you introduce custom Python classes (beyond standard dataclasses or Pydantic models) as task inputs or outputs, ensure they are properly serializable. Consider inheriting from
mashumaro.types.SerializableTypeand implementing_serializeand_deserializemethods, or registering a customTypeTransformer. - Avoid Pickle When Possible: While the
FlytePickleTransformerprovides a convenient fallback, it should be used sparingly. Pickled data is generally less efficient, not human-readable, and lacks interoperability with non-Python environments. Prioritize using types with dedicated transformers or defining custom types that serialize to open, language-agnostic formats (e.g., JSON, MessagePack, Parquet). - Schema Evolution: For structured data (dataclasses, Pydantic models,
DataFramewith column definitions), consider how schema changes might impact downstream tasks. The platform's type system can help detect and manage these changes, but careful design is still necessary.