Core Concepts & Task Definition
Core Concepts & Task Definition
This section outlines the fundamental concepts and mechanisms for defining and managing tasks within the system. It covers how execution environments are configured, how tasks are structured, and how various operational aspects like resources, caching, and secrets are handled.
Environments: Defining Execution Contexts
Environments serve as logical groupings for tasks, providing a shared and consistent execution context. They encapsulate common configurations, promoting reusability and simplifying task definitions.
The primary class for defining an environment is TaskEnvironment. It extends a base Environment and offers a comprehensive set of parameters:
- Name: A unique identifier for the environment.
- Image: The Docker container image used for all tasks within this environment. This can be an explicit URI string or an
Imageobject for programmatic image construction. Using"auto"defaults to a system-provided base image. - Resources: Specifies the default CPU, memory, GPU, and disk resources allocated to tasks in this environment. Individual tasks can override these defaults.
- Environment Variables: A dictionary of key-value pairs injected as environment variables into the task containers.
- Secrets: A list of
Secretobjects to be securely injected into the environment, making them available to tasks. - Dependencies: A list of other
Environmentinstances that this environment depends on. This hints to the deployment system to ensure dependent environments are deployed first. - Cache: Defines the default caching policy for tasks within this environment. This can be
"auto","override", or"disable", or a detailedCacheobject. - Reusable: A
ReusePolicyobject that configures the environment for reuse across multiple task invocations. This is particularly useful for reducing startup overhead for short-lived tasks. - Plugin Configuration: Allows passing arbitrary configuration objects to specialized task plugins.
Example: Defining and Cloning an Environment
from datetime import timedelta
from flyte import TaskEnvironment, Resources, ReusePolicy, Image
# Define a base environment
my_base_env = TaskEnvironment(
name="my-ml-env",
image=Image.from_debian_base().with_pip_packages("scikit-learn", "pandas"),
resources=Resources(cpu="1", memory="2Gi"),
reusable=ReusePolicy(replicas=2, idle_ttl=timedelta(minutes=5)),
)
# Clone the environment for a specific use case, overriding some parameters
production_env = my_base_env.clone_with(
name="my-ml-production-env",
resources=Resources(cpu="2", memory="4Gi", gpu="T4:1"),
cache="override",
)
The add_dependency method allows explicitly declaring dependencies between environments, which is crucial for orchestrating complex deployments.
Tasks: The Units of Work
Tasks are the fundamental executable units within the system. They represent a single, well-defined piece of computation. Tasks are typically Python functions decorated with @env.task from a TaskEnvironment instance.
The TaskTemplate class serves as the base for all task definitions, while AsyncFunctionTaskTemplate is the concrete implementation for Python functions. Key parameters for defining a task include:
- Function (
func): The Python function that implements the task's logic. It can be synchronous or asynchronous. - Name: A unique, fully qualified name for the task (e.g.,
environment_name.function_name). - Short Name: A user-friendly, shorter name for the task, defaulting to the function's name.
- Image: The container image to use for this specific task. If not provided, it inherits from the parent
TaskEnvironment. - Resources: Overrides the default resource allocation from the parent environment for this task.
- Cache: Overrides the default caching policy from the parent environment for this task.
- Retries: Specifies the retry strategy (
RetryStrategyor an integer count) for the task in case of failure. - Timeout: Defines the maximum runtime and queued time for the task using a
Timeoutobject. - Reusable: Overrides the reusability policy from the parent environment. Setting it to
"off"disables reusability for this task. - Documentation: A
Documentationobject or extracted from the function's docstring, providing human-readable descriptions. - Environment Variables: Additional environment variables specific to this task.
- Secrets: Additional
Secretobjects to be injected into this task. - Pod Template: A
PodTemplateobject or string reference for advanced Kubernetes pod configuration. - Report: A boolean indicating whether to generate an HTML report for the task's execution.
- Max Inline I/O Bytes: The maximum size allowed for inputs and outputs passed directly (inline) to the task, excluding files or directories.
Example: Defining Tasks within an Environment
from datetime import timedelta
from flyte import TaskEnvironment, Resources, RetryStrategy, Timeout
my_env = TaskEnvironment(
name="data-processing-env",
image="my-custom-image:latest",
resources=Resources(cpu="0.5", memory="1Gi"),
)
@my_env.task(
short_name="fetch-data",
retries=3,
timeout=timedelta(minutes=10),
)
async def fetch_raw_data(source_url: str) -> str:
# Simulate fetching data
await asyncio.sleep(1)
return f"Data from {source_url}"
@my_env.task(
resources=Resources(cpu="1", memory="2Gi"), # Override environment resources
cache="disable", # Disable caching for this task
)
def process_data(raw_data: str, threshold: int) -> int:
# Simulate data processing
print(f"Processing: {raw_data} with threshold {threshold}")
return len(raw_data) * threshold
Tasks can be invoked directly like regular Python functions for local execution or submitted to the remote system via the flyte.run utility. The aio method allows synchronous tasks to be called within an asynchronous context, returning an awaitable.
The override method on a TaskTemplate allows for dynamic adjustment of task parameters at invocation time, without altering the original task definition.
Container Images: Defining Task Runtimes
The Image class provides a programmatic way to define and manage container images, which are the runtime environments for tasks. This allows for flexible and reproducible environment definitions.
Image Construction Methods:
Image.from_debian_base(): Creates an image based on a default Debian Python slim image, pre-configured with Flyte dependencies. This is the recommended starting point for most Python tasks.Image.from_base(image_uri): Starts with any pre-existing container image by its full URI.Image.from_dockerfile(file, registry, name): Builds an image from a local Dockerfile. Note that additional layers cannot be added programmatically after using this method.Image.from_uv_script(script, name, registry): Creates an image based on auvscript, automatically detecting Python version and dependencies.
Layering and Customization:
Once a base image is defined, various with_* methods allow adding layers to customize the image:
with_pip_packages(*packages): Installs Python packages usingpip. Supports specifying index URLs, extra index URLs, pre-releases, and build-time secrets.with_requirements(file): Installs Python packages from arequirements.txtfile.with_uv_project(pyproject_file, uvlock): Configures the image with auvproject, installing dependencies fromuv.lock.with_apt_packages(*packages): Installs system-level packages usingapt.with_env_vars(env_vars): Sets environment variables within the image.with_source_folder(src, dst): Copies a local directory into the image.with_source_file(src, dst): Copies a local file into the image.with_commands(commands): Executes arbitrary shell commands during the image build process.with_workdir(workdir): Sets the working directory inside the container.
Image Lifecycle and Identification:
identifier: A unique hash of the image's layers and properties, used for caching and lookup.uri: The fully qualified URI of the image (e.g.,registry/name:tag).ImageBuildEngine: The internal component responsible for orchestrating image builds (e.g., using Docker/BuildKit locally or a remote builder).ImageChecker: Used to determine if an image already exists in a registry, avoiding redundant builds.
Example: Building a Layered Image
from flyte import Image
from pathlib import Path
my_custom_image = (
Image.from_debian_base(python_version=(3, 11), name="my-app", registry="my-registry.com")
.with_pip_packages("numpy", "scipy==1.10.0")
.with_source_folder(Path("./my_code"), "/app")
.with_commands(["apt-get update && apt-get install -y git"])
.with_env_vars({"MY_ENV_VAR": "some_value"})
)
# The image will be built and pushed to my-registry.com/my-app:<hash>
Resource Management: Allocating Compute
The Resources class allows precise allocation of compute resources to tasks and environments. This is critical for controlling costs, ensuring performance, and preventing resource contention.
- CPU: Specifies the CPU request and optional limit (e.g.,
"1",(1, 2),"500m"). - Memory: Specifies the memory request and optional limit (e.g.,
"1Gi",("1Gi", "2Gi")). - GPU: Specifies GPU requirements. This can be an integer quantity (e.g.,
1), a string combining device type and quantity (e.g.,"T4:1"), or aDeviceobject for more granular control over device type, quantity, and partition. - Disk: Specifies the disk size (e.g.,
"10GiB"). - Shared Memory (
shm): Configures shared memory for the container (e.g.,"auto"or a specific size like"1Gi").
Example: Defining Task Resources
from flyte import Resources, Device
# Task with specific CPU and memory
@my_env.task(resources=Resources(cpu="2", memory="4Gi"))
def high_compute_task():
pass
# Task with a specific GPU type and quantity
@my_env.task(resources=Resources(gpu="A100:1"))
def gpu_task():
pass
# Task with a custom device partition
@my_env.task(resources=Resources(gpu=Device(device="A100", quantity=1, partition="1g.5gb")))
def partitioned_gpu_task():
pass
Secrets: Secure Configuration
The Secret class provides a secure mechanism to inject sensitive information into task execution environments or during image build processes. Secrets are referenced by a key and can optionally belong to a group. They can be mounted as environment variables or files.
- Key: The name of the secret in the secret store.
- Group: An optional grouping for secrets.
- Mount: Specifies a file path within the container where the secret should be mounted (e.g.,
/etc/flyte/secrets/my-secret). - As Environment Variable (
as_env_var): Specifies the name of the environment variable the secret's value should be exposed as. If not provided, a default uppercase, underscore-separated name is derived from the key and group.
Example: Using Secrets in Tasks and Image Builds
from flyte import Secret, TaskEnvironment, Image
# Define a secret
my_api_key = Secret(key="my-api-key", group="external-services", as_env_var="EXTERNAL_API_KEY")
# Use the secret in an environment
secure_env = TaskEnvironment(
name="secure-env",
image="my-secure-image:latest",
secrets=[my_api_key],
)
@secure_env.task
def use_api_key():
import os
print(f"API Key: {os.environ['EXTERNAL_API_KEY']}")
# Use a secret during image build (e.g., for private package repositories)
private_repo_image = (
Image.from_debian_base()
.with_pip_packages(
"private-package",
secret_mounts=[Secret(key="GITHUB_PAT", as_env_var="GITHUB_PAT_ENV")],
)
)
Caching, Retries, and Timeouts: Optimizing Execution
These configurations are crucial for building robust and efficient workflows.
Caching (Cache)
The Cache class defines how task results are cached, enabling faster re-execution of identical tasks.
- Behavior:
"auto": Caching is enabled, and a version hash is automatically generated."override": Caching is enabled, and a specificversion_overrideis used."disable": Caching is disabled.
- Version Override: An explicit string to use as the cache version. If not provided, a version is computed based on
policies. - Serialize: If
True, identical concurrent executions of a task will run serially, reusing the first result. - Ignored Inputs: A tuple of input names to exclude when computing the cache version hash.
- Salt: An arbitrary string used to salt the hash generation, ensuring unique versions even with identical code.
- Policies: A list of
CachePolicyobjects (e.g.,FunctionBodyPolicy) that define how the cache version hash is generated from task properties.FunctionBodyPolicyhashes the task's source code.
Example: Configuring Task Caching
from flyte import Cache, TaskEnvironment
my_env = TaskEnvironment(name="caching-env", image="my-image:latest")
@my_env.task(
cache=Cache(
behavior="auto",
ignored_inputs=("input_timestamp",), # Ignore this input for caching purposes
salt="v1", # Add a custom salt
)
)
def expensive_computation(data: str, input_timestamp: str) -> int:
# This task will reuse results if 'data' is the same, regardless of 'input_timestamp'
return len(data)
Retries (RetryStrategy)
The RetryStrategy class defines how tasks are retried upon failure.
- Count: The number of times a task should be retried.
- Backoff: The maximum backoff time between retries (timedelta or int seconds).
- Backoff Factor: An exponential factor applied to the backoff duration.
Example: Configuring Task Retries
from flyte import RetryStrategy, TaskEnvironment
my_env = TaskEnvironment(name="retry-env", image="my-image:latest")
@my_env.task(retries=5) # Simple retry count
def flaky_task_simple():
import random
if random.random() < 0.7:
raise ValueError("Simulated failure")
return "Success"
@my_env.task(retries=RetryStrategy(count=3, backoff=timedelta(seconds=10), backoff_factor=2))
def flaky_task_with_backoff():
import random
if random.random() < 0.7:
raise ValueError("Simulated failure")
return "Success"
Timeouts (Timeout)
The Timeout class specifies maximum durations for task execution.
- Max Runtime: The maximum time a task is allowed to run (timedelta or int seconds).
- Max Queued Time: The maximum time a task can remain in the queue before starting execution.
Example: Configuring Task Timeouts
from datetime import timedelta
from flyte import Timeout, TaskEnvironment
my_env = TaskEnvironment(name="timeout-env", image="my-image:latest")
@my_env.task(timeout=Timeout(max_runtime=timedelta(minutes=5), max_queued_time=timedelta(minutes=1)))
async def long_running_task():
await asyncio.sleep(300) # This will time out if it exceeds 5 minutes
return "Done"
Execution Context and Lifecycle
The system provides a rich execution context that tasks can access at runtime, and manages the lifecycle of runs and individual actions.
Context (Context, ContextData, TaskContext)
The Context class manages the current execution context, which is thread-local and coroutine-aware. ContextData holds the raw data, and TaskContext provides task-specific runtime information:
- Action ID: Identifies the current execution step (
ActionID). - Version: The version of the currently executing task.
- Raw Data Path: A
RawDataPathobject for managing temporary data storage, both local and remote. - Input/Output Paths: URIs for task inputs and outputs.
- Run Base Directory: The base directory for the current run's data.
- Report: A
Reportobject for generating custom HTML reports during task execution. - Group Data: Information if the task is part of a group.
- Checkpoints: Paths for task checkpointing.
- Code Bundle: Details about the code package used for execution.
- Compiled Image Cache: Information about built images.
- Mode: Indicates the execution mode (
"local","remote","hybrid").
Tasks can access this context using flyte.ctx().
Runs and Actions (Run, Action, LazyEntity)
- Run: Represents a top-level execution of a task or workflow. It provides methods to
waitfor completion,show_logs,abort, and retrievedetails(including inputs and outputs). - Action: A sub-component of a
Run, representing a specific execution step, often a single task invocation. It has its ownphase,runtime,attempts, and can providelogs. - LazyEntity: A proxy for remote tasks (
Task.get()) that defers fetching the full task definition until it's actually needed (e.g., when called or when its properties are accessed). This optimizes network calls.
The _Runner class orchestrates the execution of tasks in different modes:
local: Tasks run directly on the local machine.remote: Tasks are submitted to the remote Flyte backend for execution.hybrid: Parent actions run locally, while child actions are submitted remotely.
Example: Running a Task
import flyte
from flyte import TaskEnvironment
my_env = TaskEnvironment(name="example-env", image="my-image:latest")
@my_env.task
def greet(name: str) -> str:
return f"Hello, {name}!"
async def main():
# Run locally
local_result = await flyte.with_runcontext(mode="local").run.aio(greet, name="Local User")
print(f"Local run output: {local_result}")
# Run remotely
remote_run = await flyte.with_runcontext(mode="remote", name="my-remote-greeting").run.aio(greet, name="Remote User")
print(f"Remote run URL: {remote_run.url}")
await remote_run.wait()
remote_result = await remote_run.outputs()
print(f"Remote run output: {remote_result[0]}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Type System: Data Serialization and Deserialization
The TypeEngine is the core of the system's extensible type system, responsible for converting Python native types to and from Flyte's internal Literal representation. This enables seamless data passing between tasks, regardless of their underlying language or execution environment.
TypeTransformer: The base class for implementing custom type conversions. Each transformer is registered for a specific Python type.LiteralType: Flyte's protobuf-based representation of data types.Literal: Flyte's protobuf-based representation of data values.
Key TypeTransformer implementations include:
DataclassTransformer: Handles Python dataclasses, serializing them to MessagePack bytes and including JSON schema metadata.PydanticTransformer: Handles PydanticBaseModelinstances, also using MessagePack and JSON schema.ListTransformer: Converts Python lists to FlyteLiteralCollections.DictTransformer: Converts Python dictionaries to FlyteLiteralMaps or binary scalars for untyped dictionaries.EnumTransformer: Handles Pythonenum.Enumtypes.UnionTransformer: Manages conversions fortyping.Uniontypes, includingOptional.FileTransformer: HandlesFileobjects, managing their URI and hash.DirTransformer: HandlesDirobjects, managing their URI and hash.DataFrameTransformerEngine: A meta-transformer for various DataFrame types (e.g., Pandas, Arrow), allowing custom encoders and decoders to be registered.FlytePickleTransformer: A fallback transformer that pickles any unrecognized Python type, allowing it to be passed between tasks.
Integration Points:
TypeEngine.register(): Allows developers to register customTypeTransformerimplementations for new Python types.TypeEngine.to_literal()andTypeEngine.to_python_value(): These methods are used internally by the system to perform the actual data conversions during task execution.
This robust type system ensures that complex data structures can be reliably exchanged between tasks, even across different programming languages or execution environments, while providing mechanisms for extensibility and optimization.