Defining Tasks
Tasks are the fundamental units of computation, encapsulating a specific piece of logic along with its execution environment, resources, and operational policies. They can be simple Python functions, asynchronous functions, or more complex operations.
Basic Task Definition
Tasks are primarily defined using a decorator provided by a TaskEnvironment. A TaskEnvironment acts as a container for tasks, allowing them to share common configurations such as Docker images, resource allocations, and environment variables.
To define a task, instantiate a TaskEnvironment and use its task decorator on a Python function:
from datetime import timedelta
from flyte.environments import TaskEnvironment
from flyte.resources import Resources
# Define a task environment with shared configurations
my_env = TaskEnvironment(
name="my_project_tasks",
image="my_custom_image:latest",
resources=Resources(cpu="500m", memory="512Mi"),
env_vars={"MY_API_KEY": "some_value"},
)
@my_env.task
def greet_user(name: str) -> str:
"""
A simple task that greets a user.
"""
return f"Hello, {name}!"
@my_env.task(retries=3, timeout=timedelta(minutes=5))
async def fetch_data(url: str) -> bytes:
"""
An asynchronous task to fetch data from a URL, with retries and a timeout.
"""
# Simulate an async network call
import asyncio
await asyncio.sleep(1)
return f"Data from {url}".encode()
# Tasks can be called directly for local execution
print(greet_user("Alice"))
# Asynchronous tasks return a coroutine that can be awaited locally
import asyncio
print(asyncio.run(fetch_data("http://example.com")))
When a task is invoked within a workflow execution context, the system handles its submission and execution according to the defined parameters. When called directly outside of a workflow, the task executes as a regular Python function.
Task Configuration Parameters
The task decorator, which internally creates a TaskTemplate or AsyncFunctionTaskTemplate, provides extensive options to configure task behavior. These parameters can be set statically during definition or dynamically overridden at invocation.
Execution Environment
name: (Implicitly derived fromTaskEnvironment.name+ function name) The unique identifier for the task.image: (Inherited fromTaskEnvironmentor"auto") The Docker image where the task's code will execute. Setting it to"auto"uses a default image with the necessary runtime installed.task_type: (Defaults to"python") Specifies the type of task, often used by execution plugins.env_vars: (Inherited fromTaskEnvironmentorOptional[Dict[str, str]]) Environment variables to set within the task's execution container.secrets: (Inherited fromTaskEnvironmentorOptional[SecretRequest]) Specifies secrets to be injected into the task's runtime environment.pod_template: (Inherited fromTaskEnvironmentorOptional[Union[str, PodTemplate]]) A Kubernetes Pod template to customize the underlying Pod configuration for the task.
Resource Management
resources: (Inherited fromTaskEnvironmentorOptional[Resources]) Defines the CPU, memory, and GPU resources allocated to the task.from flyte.resources import Resources
@my_env.task(resources=Resources(cpu="1", memory="2Gi", gpu="1"))
def process_gpu_data():
# Task requiring GPU
passmax_inline_io_bytes: (Defaults toMAX_INLINE_IO_BYTES) The maximum allowed size in bytes for inputs and outputs passed directly to the task (e.g., primitives, strings, dicts). This limit does not apply to files, directories, or dataframes, which are handled via external storage.
Operational Policies
cache: (CacheRequestor"auto","override","disable") Controls caching behavior."auto": Caches task results by default."override": Forces caching, even if the environment or workflow disables it."disable": Prevents caching for this specific task.
@my_env.task(cache="disable")
def non_cacheable_task():
# This task's results will not be cached
passretries: (Union[int, RetryStrategy], defaults to0) The number of times to retry the task on failure.@my_env.task(retries=5)
def flaky_task():
# This task will retry up to 5 times on failure
passtimeout: (Optional[TimeoutType]) The maximum duration for the task to complete. If exceeded, the task is terminated.from datetime import timedelta
@my_env.task(timeout=timedelta(hours=1))
def long_running_task():
# This task will time out after 1 hour
passinterruptable: (bool, defaults toFalse) IfTrue, the task can be scheduled on interruptible nodes, which may be cheaper but can be preempted. The task's code should be designed to handle interruptions and resumptions.reusable: (Optional[ReusePolicy]) Defines if the task's execution environment (e.g., Python process) can be reused across multiple invocations. This can improve performance by reducing startup overhead.- When
reusableis enabled, certain parameters likeresources,env_vars, andsecretscannot be overridden at invocation time, as the environment is shared. - For reusable environments with
concurrency > 1, tasks must be asynchronous (async def).
- When
docs: (Optional[Documentation]) Provides structured documentation for the task, which can be displayed in the UI. If not provided, the function's docstring is used.report: (bool, defaults toFalse) IfTrue, an HTML report for the task will be generated.
Dynamic Task Configuration with override
While tasks are defined with static parameters, the override method allows for dynamic adjustments to certain configurations when a task is invoked within a workflow. This is useful for adapting task behavior without redefining the task itself.
from flyte.resources import Resources
from flyte.environments import TaskEnvironment
from flyte.policies import ReusePolicy
my_env = TaskEnvironment(name="dynamic_tasks")
@my_env.task(retries=0, resources=Resources(cpu="100m", memory="128Mi"))
def data_processing_task(data_path: str):
print(f"Processing data from {data_path}")
# In a workflow, you might call it like this:
# @my_env.workflow
# def my_workflow():
# # Default execution
# data_processing_task(data_path="s3://my-bucket/small-data")
#
# # Override resources and add retries for a larger, more critical dataset
# data_processing_task.override(
# resources=Resources(cpu="1", memory="2Gi"),
# retries=3
# )(data_path="s3://my-bucket/large-data")
# Example of local override (for demonstration, typically used in workflows)
task_with_more_resources = data_processing_task.override(
resources=Resources(cpu="1", memory="2Gi"),
retries=3
)
# print(task_with_more_resources("s3://my-bucket/large-data")) # This would execute locally with overridden params
Parameters that can be overridden:
short_nameresourcescacheretriestimeoutreusable(can be set to"off"to disable environment reusability for a specific invocation)env_varssecretsmax_inline_io_bytespod_template
Limitations:
The following parameters cannot be overridden: name, image, docs, interface. These are considered fundamental to the task's definition.
Important Considerations for reusable overrides:
If a task is part of a TaskEnvironment configured with reusable enabled, you cannot override resources, env_vars, or secrets for that specific task invocation. This is because the reusable environment is shared, and these parameters are fixed at the environment level. To override these, you must explicitly set reusable="off" in the override call.
Task Execution Flow
When a task is invoked, its execution path depends on the context:
- Local Execution: If the task is called directly as a Python function (e.g.,
greet_user("Alice")), it executes immediately within the current Python process. Theforwardmethod of theTaskTemplateis invoked, which in turn calls the underlying Python function (func). Asynchronous tasks return a coroutine that can be awaited. - Workflow Execution: When a task is part of a workflow, its invocation (
task_instance(...)) triggers a submission to the execution controller. The controller then orchestrates the task's execution in a remote environment (e.g., a container).- For asynchronous tasks, the
aiomethod is used, which wraps the submission in an awaitable. - For synchronous tasks called within an asynchronous parent task,
aiocan also be used to wrap the synchronous submission in an awaitable, facilitating migration.
- For asynchronous tasks, the
The execute method of the TaskTemplate is the core logic that runs within the remote execution environment. It handles pre- and post-execution hooks (pre, post) and then calls the user-defined function (func).
Task Environments
The TaskEnvironment class provides a powerful mechanism for grouping tasks and applying common configurations across them. This promotes consistency and simplifies management.
from flyte.environments import TaskEnvironment
from flyte.resources import Resources
from flyte.policies import ReusePolicy
from flyte.core.task_template import TaskTemplate, NativeInterface
from typing import Any, Dict
# Define a base environment
base_env = TaskEnvironment(
name="data_processing_base",
image="my_data_science_image:v1.0",
resources=Resources(cpu="500m", memory="1Gi"),
cache="auto",
)
@base_env.task
def preprocess_data(input_path: str) -> str:
# ... preprocessing logic ...
return "processed_path"
# Clone the base environment to create a specialized one for high-performance tasks
# with different resources and a reusable policy
high_perf_env = base_env.clone_with(
name="high_performance_env",
resources=Resources(cpu="2", memory="4Gi"),
reusable=ReusePolicy(concurrency=4), # Enable environment reuse with 4 concurrent tasks
cache="override", # Force caching for this environment
)
@high_perf_env.task
async def train_model(data_path: str) -> str:
# ... model training logic ...
return "model_artifact_path"
# You can also add tasks programmatically
class CustomTask(TaskTemplate[Dict[str, Any], str]):
name: str = "my_custom_task_template"
interface: NativeInterface = NativeInterface.from_callable(lambda x: str(x))
async def execute(self, x: Dict[str, Any]) -> str:
return f"Custom task executed with {x}"
my_custom_task_instance = CustomTask(name="my_custom_task_instance", interface=NativeInterface.from_callable(lambda x: str(x)))
base_env.add_task(my_custom_task_instance)
# Access tasks defined within an environment
print(base_env.tasks.keys())
# Output: dict_keys(['data_processing_base.preprocess_data', 'my_custom_task_instance'])
TaskEnvironment Parameters
name: A unique name for the environment. This name is prefixed to all tasks defined within it.image: The default Docker image for tasks in this environment.resources: Default resource allocations for tasks in this environment.env_vars: Default environment variables for tasks in this environment.secrets: Default secrets to inject for tasks in this environment.depends_on: A list of otherEnvironmentinstances that this environment depends on. This hints at deployment order.cache: Default caching policy for tasks in this environment.reusable: (ReusePolicy | None) Configures whether the execution environment can be reused across task invocations. This is a powerful optimization for tasks with high startup costs.- If
reusableis set,plugin_configcannot be set simultaneously. - If
reusable.concurrency > 1, only asynchronous tasks are supported within this environment.
- If
plugin_config: (Optional[Any]) Allows passing plugin-specific configuration to tasks within this environment. This enables integration with custom task types.
Cloning Environments
The clone_with method allows creating a new TaskEnvironment instance based on an existing one, but with specific parameters overridden. This is useful for creating specialized environments without duplicating all configurations.
Adding Tasks Programmatically
The add_task method allows you to register a TaskTemplate instance directly with an environment. This is useful for integrating tasks that are not defined using the decorator pattern, such as custom task types or tasks loaded from external sources.
Advanced Concepts
Asynchronous Task Handling
The system automatically detects if a decorated function is async def and wraps it in an AsyncFunctionTaskTemplate. This template ensures that asynchronous tasks are properly awaited during execution, both locally and remotely. The aio method provides a way to call synchronous tasks within an asynchronous context, which is useful for migrating existing synchronous code into new asynchronous workflows.
Plugin Integration
The plugin_config parameter in TaskEnvironment and the TaskPluginRegistry provide extension points for defining custom task types. By registering a custom TaskTemplate class with a specific plugin_config type, developers can extend the system to support new execution backends or specialized task behaviors.
Serialization and Container Execution
When tasks are deployed, their definitions and associated code are serialized. The container_args method of AsyncFunctionTaskTemplate defines the command-line arguments passed to the container that executes the task. These arguments include paths for inputs, outputs, and metadata, enabling the execution environment to correctly load data and store results. This mechanism supports raw container execution using the CoPilot system, eliminating the need for an SDK inside the container for basic I/O.