Skip to main content

Distributed Computing Integrations

Distributed Computing Integrations

This section details how to integrate and manage distributed computing frameworks within your workflows, enabling scalable execution of data processing and machine learning tasks. These integrations provide native support for Dask, Ray, and Spark, allowing you to define and execute distributed computations directly within your Python tasks.

Dask Integration

The Dask integration enables you to execute Python functions as distributed Dask computations. Dask provides parallel computing capabilities, scaling Python workloads from single machines to clusters.

Defining Dask Tasks

To define a Dask-enabled task, use the DaskTask class. This class wraps your Python function, allowing it to run within a Dask cluster.

from flytekit import task
from flyteplugins.dask.task import DaskTask, Dask, Scheduler, WorkerGroup

@task(
task_config=DaskTask(
plugin_config=Dask(
scheduler=Scheduler(image="my-custom-dask-image:latest", resources={"cpu": "1", "mem": "2Gi"}),
workers=WorkerGroup(number_of_workers=3, resources={"cpu": "2", "mem": "4Gi"}),
)
)
)
def my_dask_task(x: int, y: int) -> int:
import dask.array as da
a = da.random.random((10000, 10000), chunks=(1000, 1000))
b = a + a.T
return b.sum().compute()

Dask Configuration

The Dask class is the primary configuration object for Dask tasks. It allows you to specify the resources and images for the Dask scheduler and worker pods.

  • scheduler: Configures the Dask scheduler pod using the Scheduler class.

    • image (Optional[str]): Specifies a custom Docker image for the scheduler. If None, the task's registered image is used. This image must have dask[distributed] installed and should maintain a consistent Python environment with the workers and job runner.
    • resources (Optional[Resources]): Defines resource requests and limits for the scheduler pod (e.g., CPU, memory).
  • workers: Configures the default group of Dask worker pods using the WorkerGroup class.

    • number_of_workers (Optional[int]): The desired number of worker pods. Defaults to 1.
    • image (Optional[str]): Specifies a custom Docker image for the worker pods. If None, the task's registered image is used. Like the scheduler image, it must have dask[distributed] installed and a consistent Python environment.
    • resources (Optional[Resources]): Defines resource requests and limits for each worker pod.

Execution Details

When a DaskTask executes, the system provisions a Dask cluster with a scheduler and a configurable number of workers based on the plugin_config. The task's Python code runs within this distributed environment.

The pre method of DaskTask ensures that if the task is running in a cluster and a code bundle is available, it registers the code bundle with the Dask client using UploadDirectory. This makes your task's dependencies and code accessible to all Dask workers.

Considerations

  • Image Consistency: Ensure that the custom images provided for the scheduler and workers have dask[distributed] installed and maintain the same Python environment as the task's main execution environment to avoid dependency conflicts.
  • Resource Allocation: Carefully configure CPU and memory resources for both scheduler and worker pods to optimize performance and prevent resource exhaustion.

Ray Integration

The Ray integration allows you to run distributed applications and machine learning workloads using the Ray framework. Ray provides a simple, universal API for building distributed applications.

Defining Ray Tasks

To define a Ray-enabled task, use the RayFunctionTask class. This class enables your Python function to execute within a Ray cluster.

from flytekit import task
from flyteplugins.ray.task import RayFunctionTask, RayJobConfig, HeadNodeConfig, WorkerNodeConfig

@task(
task_config=RayFunctionTask(
plugin_config=RayJobConfig(
head_node_config=HeadNodeConfig(resources={"cpu": "1", "mem": "2Gi"}),
worker_node_config=[
WorkerNodeConfig(group_name="default-workers", replicas=2, resources={"cpu": "2", "mem": "4Gi"})
],
enable_autoscaling=True,
runtime_env={"pip": ["pandas", "numpy"]},
)
)
)
def my_ray_task(x: int) -> int:
import ray
@ray.remote
def f(i):
return i * i

futures = [f.remote(i) for i in range(x)]
return sum(ray.get(futures))

Ray Configuration

The RayJobConfig class is used to configure the Ray cluster for your task.

  • head_node_config (Optional[HeadNodeConfig]): Configures the Ray head node.

    • ray_start_params (Optional[Dict[str, str]]): Parameters passed to ray start on the head node.
    • pod_template (Optional[PodTemplate]): A Kubernetes Pod template for the head node.
    • requests (Optional[Resources]): Resource requests for the head node pod.
    • limits (Optional[Resources]): Resource limits for the head node pod.
  • worker_node_config (List[WorkerNodeConfig]): A list of configurations for Ray worker groups.

    • group_name (str): A unique name for the worker group.
    • replicas (int): The desired number of worker pods in this group.
    • min_replicas (Optional[int]): Minimum number of worker pods for autoscaling.
    • max_replicas (Optional[int]): Maximum number of worker pods for autoscaling.
    • ray_start_params (Optional[Dict[str, str]]): Parameters passed to ray start on worker nodes.
    • pod_template (Optional[PodTemplate]): A Kubernetes Pod template for worker pods in this group.
    • requests (Optional[Resources]): Resource requests for worker pods.
    • limits (Optional[Resources]): Resource limits for worker pods.
  • enable_autoscaling (bool): If True, enables autoscaling for the Ray cluster based on min_replicas and max_replicas in worker group configurations. Defaults to False.

  • runtime_env (Optional[dict]): A dictionary defining the runtime environment for the Ray cluster (e.g., {"pip": ["pandas"]}). This is deprecated in KubeRay >= 1.1.0 and is replaced by runtime_env_yaml.

  • address (Optional[str]): The address of an existing Ray cluster to connect to. If None, a new cluster is provisioned.

  • shutdown_after_job_finishes (bool): If True, the Ray cluster will shut down immediately after the job completes. Defaults to False.

  • ttl_seconds_after_finished (Optional[int]): Time-to-live in seconds for the Ray cluster after the job finishes. The cluster will be cleaned up after this duration.

Execution Details

The pre method of RayFunctionTask initializes the Ray client. If running in a cluster, it configures a runtime_env with the current working directory, ensuring that the task's code and dependencies are available to all Ray nodes. The custom_config method translates the RayJobConfig into the necessary RayJob and RayCluster specifications for execution.

Considerations

  • runtime_env Deprecation: For KubeRay versions 1.1.0 and above, use runtime_env_yaml instead of runtime_env for defining the Ray runtime environment. The system handles the conversion internally, but it's good practice to be aware.
  • Resource Allocation: Proper allocation of resources for head and worker nodes is crucial for Ray cluster stability and performance.
  • Autoscaling: Leverage enable_autoscaling and min_replicas/max_replicas for cost-effective and elastic resource management, especially for variable workloads.

Spark Integration

The Spark integration allows you to execute PySpark applications as distributed Spark jobs. This enables large-scale data processing directly within your workflows.

Defining PySpark Tasks

To define a PySpark-enabled task, use the PysparkFunctionTask class. This class transforms your Python function into a Spark application that runs on a Kubernetes cluster.

from flytekit import task
from flyteplugins.spark.task import PysparkFunctionTask, Spark
from pyspark.sql import SparkSession

@task(
task_config=PysparkFunctionTask(
plugin_config=Spark(
spark_conf={"spark.executor.cores": "2", "spark.executor.memory": "4g"},
driver_pod={"requests": {"cpu": "1", "mem": "2Gi"}},
executor_pod={"requests": {"cpu": "1", "mem": "2Gi"}},
)
)
)
def my_pyspark_task(x: int) -> float:
spark = SparkSession.builder.appName("MyPySparkApp").getOrCreate()
data = [(i,) for i in range(x)]
df = spark.createDataFrame(data, ["value"])
result = df.agg({"value": "sum"}).collect()[0][0]
spark.stop()
return float(result)

Spark Configuration

The Spark class provides comprehensive configuration options for your Spark job.

  • spark_conf (Optional[Dict[str, str]]): A dictionary of Spark configuration properties (e.g., {"spark.executor.cores": "2"}).
  • hadoop_conf (Optional[Dict[str, str]]): A dictionary of Hadoop configuration properties.
  • executor_path (Optional[str]): The path to the Python binary used for PySpark execution. If None, the system's interpreter path is used.
  • applications_path (Optional[str]): The path to the main application file. If None, the entrypoint path of the serialized task is used.
  • driver_pod (Optional[PodTemplate]): A Kubernetes Pod template for the Spark driver pod. This allows fine-grained control over driver resources and other pod specifications.
  • executor_pod (Optional[PodTemplate]): A Kubernetes Pod template for the Spark executor pods. This allows fine-grained control over executor resources and other pod specifications.

Execution Details

The pre method of PysparkFunctionTask initializes a SparkSession and, if running in a cluster, packages the current working directory into a zip file and adds it to the SparkContext using addPyFile. This ensures that all necessary Python code and dependencies are distributed to the Spark executors.

The custom_config method constructs a SparkJob object based on the Spark plugin configuration, including Spark and Hadoop properties, application paths, and pod templates for the driver and executors.

The post method ensures that the SparkSession is stopped gracefully after the task completes, releasing resources.

Considerations

  • Resource Allocation: Configure spark_conf properties like spark.executor.cores, spark.executor.memory, spark.driver.cores, and spark.driver.memory to match your workload requirements. Additionally, use driver_pod and executor_pod for more detailed Kubernetes resource requests and limits.
  • Code Distribution: The addPyFile mechanism in the pre method handles distributing your task's code. Ensure all necessary modules are within the working directory when the task is serialized.
  • SparkSession Management: The PysparkFunctionTask automatically manages the SparkSession lifecycle (creation in pre, stopping in post). Avoid creating or stopping SparkSession instances manually within your task function unless you have specific advanced requirements.