Dask Integration
Dask Integration
Dask Integration enables the execution of distributed Dask workloads directly within the platform. This capability allows developers to leverage Dask's parallel computing features for processing large datasets and complex computations, with the platform managing the underlying Dask cluster infrastructure.
Configuring Dask Tasks
To define a Dask-enabled task, use the DaskTask plugin. This plugin transforms standard Python functions into distributed Dask computations. The core configuration for a Dask task is provided through the Dask object, which specifies the setup for both the Dask scheduler and its worker groups.
The Dask object contains two primary configuration components:
scheduler: An instance ofSchedulerthat defines the Dask scheduler's environment.workers: An instance ofWorkerGroupthat configures the default group of Dask worker pods.
Scheduler Configuration
The Scheduler object controls the Dask scheduler pod's characteristics:
image: Specifies a custom Docker image for the scheduler. IfNone, the image associated with the task's registration is used. The custom image must havedask[distributed]installed and maintain a consistent Python environment with the worker pods and the job runner.resources: Defines the CPU and memory resources requested for the scheduler pod.
Worker Group Configuration
The WorkerGroup object configures a collection of Dask worker pods:
number_of_workers: Sets the desired number of worker pods in this group. The default is 1.image: Specifies a custom Docker image for the worker pods. IfNone, the image associated with the task's registration is used. Similar to the scheduler image, this image must havedask[distributed]installed and maintain a consistent Python environment with the scheduler and the job runner.resources: Defines the CPU and memory resources requested for each individual worker pod.
Execution Details
When a DaskTask executes in a cluster environment, the platform performs several key actions:
- Dask Client Initialization: The task automatically initializes a Dask
Clientinstance. This client connects to the Dask scheduler provisioned by the platform, enabling distributed computations. - Code Bundle Upload: If the task involves local code that needs to be distributed to Dask workers (e.g., when using
flyte.ctx().code_bundle), theUploadDirectoryplugin is registered with the Dask client. This mechanism ensures that all worker pods have access to the necessary application code. - Configuration Serialization: The
custom_configmethod withinDaskTasktranslates theDaskconfiguration (includingSchedulerandWorkerGroupsettings) into a platform-specificDaskJobstructure. This serialized configuration guides the underlying execution engine in provisioning and managing the Dask cluster.
Image and Environment Requirements
Maintaining a consistent Python environment across all components is critical for successful Dask task execution.
dask[distributed]Installation: Any custom Docker images used for Dask schedulers or workers must have thedask[distributed]package installed.- Environment Consistency: The Python environment, including all installed packages and their versions, must be identical across the job runner pod, the Dask scheduler pod, and all Dask worker pods. Inconsistencies can lead to serialization errors, module not found errors, or unexpected runtime behavior.
Resource Management
Careful allocation of resources for both the Scheduler and WorkerGroup is essential for performance and stability.
- Scheduler Resources: The scheduler's resource requirements depend on the number of tasks, the size of the Dask graph, and the metadata it manages.
- Worker Resources: Worker resources should be scaled based on the computational intensity and memory footprint of the distributed operations. Under-provisioning can lead to out-of-memory errors or slow execution, while over-provisioning can result in wasted cluster resources.
Example Usage
The following example demonstrates how to define a Dask task, specifying custom images and resource requests for the scheduler and worker groups.
from flytekit import task, workflow
from flyteplugins.dask.task import Dask, DaskTask, Scheduler, WorkerGroup
from flytekit.resources import Resources
# Define a Dask-enabled task
@task(
task_config=DaskTask(
plugin_config=Dask(
scheduler=Scheduler(
# Optional: specify a custom image for the scheduler.
# This image must have dask[distributed] installed.
image="my-custom-dask-image:latest",
resources=Resources(cpu="1", mem="2Gi"),
),
workers=WorkerGroup(
number_of_workers=3,
# Optional: specify a custom image for workers.
# This image must have dask[distributed] installed and match the scheduler's Python env.
image="my-custom-dask-image:latest",
resources=Resources(cpu="2", mem="4Gi"),
),
)
),
# This image is used if scheduler/worker images are not explicitly provided in plugin_config.
# It also serves as the base for the job runner pod.
container_image="my-custom-dask-image:latest",
)
def my_dask_processing_task(data_path: str) -> float:
"""
An example Dask task that performs a distributed computation.
"""
from distributed import Client
import dask.array as da
# Connects to the Dask scheduler provisioned by the platform
client = Client()
print(f"Dask client connected to: {client.dashboard_link}")
# Example Dask computation: create a large random array and compute its mean
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = x.mean().compute() # .compute() triggers the distributed computation
client.close()
return y
@workflow
def dask_workflow(input_data: str) -> float:
"""
A workflow that executes the Dask processing task.
"""
result = my_dask_processing_task(data_path=input_data)
return result
# To execute this workflow, register it with the platform.
# The platform will provision the Dask cluster according to the DaskTask configuration.