Skip to main content

Core Concepts & Task Definition

Core Concepts & Task Definition

This section outlines the fundamental concepts and mechanisms for defining and managing tasks within the system. It covers how execution environments are configured, how tasks are structured, and how various operational aspects like resources, caching, and secrets are handled.

Environments: Defining Execution Contexts

Environments serve as logical groupings for tasks, providing a shared and consistent execution context. They encapsulate common configurations, promoting reusability and simplifying task definitions.

The primary class for defining an environment is TaskEnvironment. It extends a base Environment and offers a comprehensive set of parameters:

  • Name: A unique identifier for the environment.
  • Image: The Docker container image used for all tasks within this environment. This can be an explicit URI string or an Image object for programmatic image construction. Using "auto" defaults to a system-provided base image.
  • Resources: Specifies the default CPU, memory, GPU, and disk resources allocated to tasks in this environment. Individual tasks can override these defaults.
  • Environment Variables: A dictionary of key-value pairs injected as environment variables into the task containers.
  • Secrets: A list of Secret objects to be securely injected into the environment, making them available to tasks.
  • Dependencies: A list of other Environment instances that this environment depends on. This hints to the deployment system to ensure dependent environments are deployed first.
  • Cache: Defines the default caching policy for tasks within this environment. This can be "auto", "override", or "disable", or a detailed Cache object.
  • Reusable: A ReusePolicy object that configures the environment for reuse across multiple task invocations. This is particularly useful for reducing startup overhead for short-lived tasks.
  • Plugin Configuration: Allows passing arbitrary configuration objects to specialized task plugins.

Example: Defining and Cloning an Environment

from datetime import timedelta
from flyte import TaskEnvironment, Resources, ReusePolicy, Image

# Define a base environment
my_base_env = TaskEnvironment(
name="my-ml-env",
image=Image.from_debian_base().with_pip_packages("scikit-learn", "pandas"),
resources=Resources(cpu="1", memory="2Gi"),
reusable=ReusePolicy(replicas=2, idle_ttl=timedelta(minutes=5)),
)

# Clone the environment for a specific use case, overriding some parameters
production_env = my_base_env.clone_with(
name="my-ml-production-env",
resources=Resources(cpu="2", memory="4Gi", gpu="T4:1"),
cache="override",
)

The add_dependency method allows explicitly declaring dependencies between environments, which is crucial for orchestrating complex deployments.

Tasks: The Units of Work

Tasks are the fundamental executable units within the system. They represent a single, well-defined piece of computation. Tasks are typically Python functions decorated with @env.task from a TaskEnvironment instance.

The TaskTemplate class serves as the base for all task definitions, while AsyncFunctionTaskTemplate is the concrete implementation for Python functions. Key parameters for defining a task include:

  • Function (func): The Python function that implements the task's logic. It can be synchronous or asynchronous.
  • Name: A unique, fully qualified name for the task (e.g., environment_name.function_name).
  • Short Name: A user-friendly, shorter name for the task, defaulting to the function's name.
  • Image: The container image to use for this specific task. If not provided, it inherits from the parent TaskEnvironment.
  • Resources: Overrides the default resource allocation from the parent environment for this task.
  • Cache: Overrides the default caching policy from the parent environment for this task.
  • Retries: Specifies the retry strategy (RetryStrategy or an integer count) for the task in case of failure.
  • Timeout: Defines the maximum runtime and queued time for the task using a Timeout object.
  • Reusable: Overrides the reusability policy from the parent environment. Setting it to "off" disables reusability for this task.
  • Documentation: A Documentation object or extracted from the function's docstring, providing human-readable descriptions.
  • Environment Variables: Additional environment variables specific to this task.
  • Secrets: Additional Secret objects to be injected into this task.
  • Pod Template: A PodTemplate object or string reference for advanced Kubernetes pod configuration.
  • Report: A boolean indicating whether to generate an HTML report for the task's execution.
  • Max Inline I/O Bytes: The maximum size allowed for inputs and outputs passed directly (inline) to the task, excluding files or directories.

Example: Defining Tasks within an Environment

from datetime import timedelta
from flyte import TaskEnvironment, Resources, RetryStrategy, Timeout

my_env = TaskEnvironment(
name="data-processing-env",
image="my-custom-image:latest",
resources=Resources(cpu="0.5", memory="1Gi"),
)

@my_env.task(
short_name="fetch-data",
retries=3,
timeout=timedelta(minutes=10),
)
async def fetch_raw_data(source_url: str) -> str:
# Simulate fetching data
await asyncio.sleep(1)
return f"Data from {source_url}"

@my_env.task(
resources=Resources(cpu="1", memory="2Gi"), # Override environment resources
cache="disable", # Disable caching for this task
)
def process_data(raw_data: str, threshold: int) -> int:
# Simulate data processing
print(f"Processing: {raw_data} with threshold {threshold}")
return len(raw_data) * threshold

Tasks can be invoked directly like regular Python functions for local execution or submitted to the remote system via the flyte.run utility. The aio method allows synchronous tasks to be called within an asynchronous context, returning an awaitable.

The override method on a TaskTemplate allows for dynamic adjustment of task parameters at invocation time, without altering the original task definition.

Container Images: Defining Task Runtimes

The Image class provides a programmatic way to define and manage container images, which are the runtime environments for tasks. This allows for flexible and reproducible environment definitions.

Image Construction Methods:

  • Image.from_debian_base(): Creates an image based on a default Debian Python slim image, pre-configured with Flyte dependencies. This is the recommended starting point for most Python tasks.
  • Image.from_base(image_uri): Starts with any pre-existing container image by its full URI.
  • Image.from_dockerfile(file, registry, name): Builds an image from a local Dockerfile. Note that additional layers cannot be added programmatically after using this method.
  • Image.from_uv_script(script, name, registry): Creates an image based on a uv script, automatically detecting Python version and dependencies.

Layering and Customization:

Once a base image is defined, various with_* methods allow adding layers to customize the image:

  • with_pip_packages(*packages): Installs Python packages using pip. Supports specifying index URLs, extra index URLs, pre-releases, and build-time secrets.
  • with_requirements(file): Installs Python packages from a requirements.txt file.
  • with_uv_project(pyproject_file, uvlock): Configures the image with a uv project, installing dependencies from uv.lock.
  • with_apt_packages(*packages): Installs system-level packages using apt.
  • with_env_vars(env_vars): Sets environment variables within the image.
  • with_source_folder(src, dst): Copies a local directory into the image.
  • with_source_file(src, dst): Copies a local file into the image.
  • with_commands(commands): Executes arbitrary shell commands during the image build process.
  • with_workdir(workdir): Sets the working directory inside the container.

Image Lifecycle and Identification:

  • identifier: A unique hash of the image's layers and properties, used for caching and lookup.
  • uri: The fully qualified URI of the image (e.g., registry/name:tag).
  • ImageBuildEngine: The internal component responsible for orchestrating image builds (e.g., using Docker/BuildKit locally or a remote builder).
  • ImageChecker: Used to determine if an image already exists in a registry, avoiding redundant builds.

Example: Building a Layered Image

from flyte import Image
from pathlib import Path

my_custom_image = (
Image.from_debian_base(python_version=(3, 11), name="my-app", registry="my-registry.com")
.with_pip_packages("numpy", "scipy==1.10.0")
.with_source_folder(Path("./my_code"), "/app")
.with_commands(["apt-get update && apt-get install -y git"])
.with_env_vars({"MY_ENV_VAR": "some_value"})
)

# The image will be built and pushed to my-registry.com/my-app:<hash>

Resource Management: Allocating Compute

The Resources class allows precise allocation of compute resources to tasks and environments. This is critical for controlling costs, ensuring performance, and preventing resource contention.

  • CPU: Specifies the CPU request and optional limit (e.g., "1", (1, 2), "500m").
  • Memory: Specifies the memory request and optional limit (e.g., "1Gi", ("1Gi", "2Gi")).
  • GPU: Specifies GPU requirements. This can be an integer quantity (e.g., 1), a string combining device type and quantity (e.g., "T4:1"), or a Device object for more granular control over device type, quantity, and partition.
  • Disk: Specifies the disk size (e.g., "10GiB").
  • Shared Memory (shm): Configures shared memory for the container (e.g., "auto" or a specific size like "1Gi").

Example: Defining Task Resources

from flyte import Resources, Device

# Task with specific CPU and memory
@my_env.task(resources=Resources(cpu="2", memory="4Gi"))
def high_compute_task():
pass

# Task with a specific GPU type and quantity
@my_env.task(resources=Resources(gpu="A100:1"))
def gpu_task():
pass

# Task with a custom device partition
@my_env.task(resources=Resources(gpu=Device(device="A100", quantity=1, partition="1g.5gb")))
def partitioned_gpu_task():
pass

Secrets: Secure Configuration

The Secret class provides a secure mechanism to inject sensitive information into task execution environments or during image build processes. Secrets are referenced by a key and can optionally belong to a group. They can be mounted as environment variables or files.

  • Key: The name of the secret in the secret store.
  • Group: An optional grouping for secrets.
  • Mount: Specifies a file path within the container where the secret should be mounted (e.g., /etc/flyte/secrets/my-secret).
  • As Environment Variable (as_env_var): Specifies the name of the environment variable the secret's value should be exposed as. If not provided, a default uppercase, underscore-separated name is derived from the key and group.

Example: Using Secrets in Tasks and Image Builds

from flyte import Secret, TaskEnvironment, Image

# Define a secret
my_api_key = Secret(key="my-api-key", group="external-services", as_env_var="EXTERNAL_API_KEY")

# Use the secret in an environment
secure_env = TaskEnvironment(
name="secure-env",
image="my-secure-image:latest",
secrets=[my_api_key],
)

@secure_env.task
def use_api_key():
import os
print(f"API Key: {os.environ['EXTERNAL_API_KEY']}")

# Use a secret during image build (e.g., for private package repositories)
private_repo_image = (
Image.from_debian_base()
.with_pip_packages(
"private-package",
secret_mounts=[Secret(key="GITHUB_PAT", as_env_var="GITHUB_PAT_ENV")],
)
)

Caching, Retries, and Timeouts: Optimizing Execution

These configurations are crucial for building robust and efficient workflows.

Caching (Cache)

The Cache class defines how task results are cached, enabling faster re-execution of identical tasks.

  • Behavior:
    • "auto": Caching is enabled, and a version hash is automatically generated.
    • "override": Caching is enabled, and a specific version_override is used.
    • "disable": Caching is disabled.
  • Version Override: An explicit string to use as the cache version. If not provided, a version is computed based on policies.
  • Serialize: If True, identical concurrent executions of a task will run serially, reusing the first result.
  • Ignored Inputs: A tuple of input names to exclude when computing the cache version hash.
  • Salt: An arbitrary string used to salt the hash generation, ensuring unique versions even with identical code.
  • Policies: A list of CachePolicy objects (e.g., FunctionBodyPolicy) that define how the cache version hash is generated from task properties. FunctionBodyPolicy hashes the task's source code.

Example: Configuring Task Caching

from flyte import Cache, TaskEnvironment

my_env = TaskEnvironment(name="caching-env", image="my-image:latest")

@my_env.task(
cache=Cache(
behavior="auto",
ignored_inputs=("input_timestamp",), # Ignore this input for caching purposes
salt="v1", # Add a custom salt
)
)
def expensive_computation(data: str, input_timestamp: str) -> int:
# This task will reuse results if 'data' is the same, regardless of 'input_timestamp'
return len(data)

Retries (RetryStrategy)

The RetryStrategy class defines how tasks are retried upon failure.

  • Count: The number of times a task should be retried.
  • Backoff: The maximum backoff time between retries (timedelta or int seconds).
  • Backoff Factor: An exponential factor applied to the backoff duration.

Example: Configuring Task Retries

from flyte import RetryStrategy, TaskEnvironment

my_env = TaskEnvironment(name="retry-env", image="my-image:latest")

@my_env.task(retries=5) # Simple retry count
def flaky_task_simple():
import random
if random.random() < 0.7:
raise ValueError("Simulated failure")
return "Success"

@my_env.task(retries=RetryStrategy(count=3, backoff=timedelta(seconds=10), backoff_factor=2))
def flaky_task_with_backoff():
import random
if random.random() < 0.7:
raise ValueError("Simulated failure")
return "Success"

Timeouts (Timeout)

The Timeout class specifies maximum durations for task execution.

  • Max Runtime: The maximum time a task is allowed to run (timedelta or int seconds).
  • Max Queued Time: The maximum time a task can remain in the queue before starting execution.

Example: Configuring Task Timeouts

from datetime import timedelta
from flyte import Timeout, TaskEnvironment

my_env = TaskEnvironment(name="timeout-env", image="my-image:latest")

@my_env.task(timeout=Timeout(max_runtime=timedelta(minutes=5), max_queued_time=timedelta(minutes=1)))
async def long_running_task():
await asyncio.sleep(300) # This will time out if it exceeds 5 minutes
return "Done"

Execution Context and Lifecycle

The system provides a rich execution context that tasks can access at runtime, and manages the lifecycle of runs and individual actions.

Context (Context, ContextData, TaskContext)

The Context class manages the current execution context, which is thread-local and coroutine-aware. ContextData holds the raw data, and TaskContext provides task-specific runtime information:

  • Action ID: Identifies the current execution step (ActionID).
  • Version: The version of the currently executing task.
  • Raw Data Path: A RawDataPath object for managing temporary data storage, both local and remote.
  • Input/Output Paths: URIs for task inputs and outputs.
  • Run Base Directory: The base directory for the current run's data.
  • Report: A Report object for generating custom HTML reports during task execution.
  • Group Data: Information if the task is part of a group.
  • Checkpoints: Paths for task checkpointing.
  • Code Bundle: Details about the code package used for execution.
  • Compiled Image Cache: Information about built images.
  • Mode: Indicates the execution mode ("local", "remote", "hybrid").

Tasks can access this context using flyte.ctx().

Runs and Actions (Run, Action, LazyEntity)

  • Run: Represents a top-level execution of a task or workflow. It provides methods to wait for completion, show_logs, abort, and retrieve details (including inputs and outputs).
  • Action: A sub-component of a Run, representing a specific execution step, often a single task invocation. It has its own phase, runtime, attempts, and can provide logs.
  • LazyEntity: A proxy for remote tasks (Task.get()) that defers fetching the full task definition until it's actually needed (e.g., when called or when its properties are accessed). This optimizes network calls.

The _Runner class orchestrates the execution of tasks in different modes:

  • local: Tasks run directly on the local machine.
  • remote: Tasks are submitted to the remote Flyte backend for execution.
  • hybrid: Parent actions run locally, while child actions are submitted remotely.

Example: Running a Task

import flyte
from flyte import TaskEnvironment

my_env = TaskEnvironment(name="example-env", image="my-image:latest")

@my_env.task
def greet(name: str) -> str:
return f"Hello, {name}!"

async def main():
# Run locally
local_result = await flyte.with_runcontext(mode="local").run.aio(greet, name="Local User")
print(f"Local run output: {local_result}")

# Run remotely
remote_run = await flyte.with_runcontext(mode="remote", name="my-remote-greeting").run.aio(greet, name="Remote User")
print(f"Remote run URL: {remote_run.url}")
await remote_run.wait()
remote_result = await remote_run.outputs()
print(f"Remote run output: {remote_result[0]}")

if __name__ == "__main__":
import asyncio
asyncio.run(main())

Type System: Data Serialization and Deserialization

The TypeEngine is the core of the system's extensible type system, responsible for converting Python native types to and from Flyte's internal Literal representation. This enables seamless data passing between tasks, regardless of their underlying language or execution environment.

  • TypeTransformer: The base class for implementing custom type conversions. Each transformer is registered for a specific Python type.
  • LiteralType: Flyte's protobuf-based representation of data types.
  • Literal: Flyte's protobuf-based representation of data values.

Key TypeTransformer implementations include:

  • DataclassTransformer: Handles Python dataclasses, serializing them to MessagePack bytes and including JSON schema metadata.
  • PydanticTransformer: Handles Pydantic BaseModel instances, also using MessagePack and JSON schema.
  • ListTransformer: Converts Python lists to Flyte LiteralCollections.
  • DictTransformer: Converts Python dictionaries to Flyte LiteralMaps or binary scalars for untyped dictionaries.
  • EnumTransformer: Handles Python enum.Enum types.
  • UnionTransformer: Manages conversions for typing.Union types, including Optional.
  • FileTransformer: Handles File objects, managing their URI and hash.
  • DirTransformer: Handles Dir objects, managing their URI and hash.
  • DataFrameTransformerEngine: A meta-transformer for various DataFrame types (e.g., Pandas, Arrow), allowing custom encoders and decoders to be registered.
  • FlytePickleTransformer: A fallback transformer that pickles any unrecognized Python type, allowing it to be passed between tasks.

Integration Points:

  • TypeEngine.register(): Allows developers to register custom TypeTransformer implementations for new Python types.
  • TypeEngine.to_literal() and TypeEngine.to_python_value(): These methods are used internally by the system to perform the actual data conversions during task execution.

This robust type system ensures that complex data structures can be reliably exchanged between tasks, even across different programming languages or execution environments, while providing mechanisms for extensibility and optimization.