Ray Integration
Ray Integration
Ray Integration enables the execution of distributed Python applications on a Ray cluster directly within Flyte tasks. This integration leverages KubeRay for Kubernetes-native Ray cluster management, providing a robust and scalable environment for distributed machine learning, data processing, and other Ray-native workloads.
Defining Ray Tasks
The RayFunctionTask plugin transforms a standard Python function into a Ray job. This plugin is responsible for orchestrating the Ray cluster and executing the user-defined code within it. To configure the Ray cluster's specifications, the RayFunctionTask utilizes a RayJobConfig object.
Ray Cluster Configuration
The RayJobConfig class is central to defining the topology and behavior of the Ray cluster. It allows for detailed specification of both head and worker nodes, as well as overall cluster properties.
Head Node Configuration
The head_node_config field, an instance of HeadNodeConfig, defines the resources and behavior of the Ray head node.
ray_start_params: A dictionary of parameters passed directly to theray startcommand on the head node.pod_template: An optionalPodTemplateobject for providing a custom Kubernetes Pod specification for the head node.requestsandlimits: OptionalResourcesobjects to specify CPU and memory requests and limits for the head node's primary container. These resource specifications take precedence over any defined inpod_template.
Worker Node Configuration
The worker_node_config field, a list of WorkerNodeConfig objects, defines the resources and behavior for one or more groups of Ray worker nodes. Each WorkerNodeConfig specifies:
group_name: A unique identifier for the worker group.replicas: The desired number of worker nodes in this group.min_replicasandmax_replicas: When autoscaling is enabled, these define the minimum and maximum number of workers for the group.ray_start_params: A dictionary of parameters passed to theray startcommand on the worker nodes within this group.pod_template: An optionalPodTemplateobject for providing a custom Kubernetes Pod specification for worker nodes in this group.requestsandlimits: OptionalResourcesobjects to specify CPU and memory requests and limits for the worker nodes' primary containers. These resource specifications take precedence over any defined inpod_template.
Cluster-Wide Settings
The RayJobConfig also includes settings that apply to the entire Ray cluster:
enable_autoscaling: A boolean flag to enable or disable KubeRay's autoscaling capabilities for the worker groups.runtime_env: An optional dictionary or YAML string that specifies the Python environment for the Ray cluster. This can include dependencies, working directory, and environment variables, ensuring all Ray actors and tasks have the necessary code and libraries.address: An optional Ray cluster address. If provided, the task attempts to connect to an existing Ray cluster instead of provisioning a new one.shutdown_after_job_finishes: A boolean flag that controls whether the Ray cluster is torn down immediately after the job completes.ttl_seconds_after_finished: An optional integer specifying the time-to-live (in seconds) for the Ray cluster after the job finishes. This allows for post-job inspection or cleanup before the cluster is deprovisioned.
Runtime Environment Management
The RayFunctionTask handles the initialization of the Ray environment. In a cluster environment, the pre method automatically configures the Ray runtime_env to include the current working directory and excludes common build artifacts. This ensures that the task's code is available to all Ray nodes.
For more complex dependency management or custom code, explicitly define the runtime_env within the RayJobConfig. This allows for precise control over the Python environment, including specifying pip requirements, conda environments, or custom working_dir settings for the Ray cluster.
Integration Details
The RayFunctionTask acts as the bridge between Flyte and KubeRay.
- The
premethod is invoked before the actual task execution. It initializes the Ray client, connecting to an existing cluster if anaddressis provided, or preparing for a new cluster. When running within a Flyte cluster, it automatically sets up theruntime_envto include the current working directory, ensuring code availability across Ray nodes. - The
custom_configmethod translates theRayJobConfiginto a KubeRayRayJobobject. This involves constructingHeadGroupSpecandWorkerGroupSpecbased on the provided configurations, including resource requests/limits and custom pod templates. It also handles the serialization of theruntime_envfor KubeRay.
Best Practices and Considerations
- Resource Allocation: Carefully define
requestsandlimitsfor both head and worker nodes usingHeadNodeConfigandWorkerNodeConfig. This ensures efficient cluster utilization, prevents resource contention, and helps manage costs. - Autoscaling: Leverage
enable_autoscalinginRayJobConfigalong withmin_replicasandmax_replicasinWorkerNodeConfigto allow the Ray cluster to dynamically scale based on workload demands. - Runtime Environment: For tasks with specific Python package requirements or custom modules, explicitly define the
runtime_envinRayJobConfig. This guarantees that all Ray actors and tasks have access to the necessary libraries and code. - Cluster Lifecycle Management: Configure
shutdown_after_job_finishesandttl_seconds_after_finishedto manage the Ray cluster's lifecycle effectively. This helps optimize resource usage and control infrastructure costs by tearing down clusters when no longer needed. - Debugging: Utilize Ray's built-in dashboard and logging capabilities for monitoring and debugging distributed applications running on the cluster.
- KubeRay Version Compatibility: Be aware that
runtime_envinRayJobConfigis deprecated in KubeRay versions 1.1.0 and above, replaced byruntime_env_yaml. The integration handles this transition by preferringruntime_env_yamlif available.