Skip to main content

Ray Integration

Ray Integration

Ray Integration enables the execution of distributed Python applications on a Ray cluster directly within Flyte tasks. This integration leverages KubeRay for Kubernetes-native Ray cluster management, providing a robust and scalable environment for distributed machine learning, data processing, and other Ray-native workloads.

Defining Ray Tasks

The RayFunctionTask plugin transforms a standard Python function into a Ray job. This plugin is responsible for orchestrating the Ray cluster and executing the user-defined code within it. To configure the Ray cluster's specifications, the RayFunctionTask utilizes a RayJobConfig object.

Ray Cluster Configuration

The RayJobConfig class is central to defining the topology and behavior of the Ray cluster. It allows for detailed specification of both head and worker nodes, as well as overall cluster properties.

Head Node Configuration

The head_node_config field, an instance of HeadNodeConfig, defines the resources and behavior of the Ray head node.

  • ray_start_params: A dictionary of parameters passed directly to the ray start command on the head node.
  • pod_template: An optional PodTemplate object for providing a custom Kubernetes Pod specification for the head node.
  • requests and limits: Optional Resources objects to specify CPU and memory requests and limits for the head node's primary container. These resource specifications take precedence over any defined in pod_template.

Worker Node Configuration

The worker_node_config field, a list of WorkerNodeConfig objects, defines the resources and behavior for one or more groups of Ray worker nodes. Each WorkerNodeConfig specifies:

  • group_name: A unique identifier for the worker group.
  • replicas: The desired number of worker nodes in this group.
  • min_replicas and max_replicas: When autoscaling is enabled, these define the minimum and maximum number of workers for the group.
  • ray_start_params: A dictionary of parameters passed to the ray start command on the worker nodes within this group.
  • pod_template: An optional PodTemplate object for providing a custom Kubernetes Pod specification for worker nodes in this group.
  • requests and limits: Optional Resources objects to specify CPU and memory requests and limits for the worker nodes' primary containers. These resource specifications take precedence over any defined in pod_template.

Cluster-Wide Settings

The RayJobConfig also includes settings that apply to the entire Ray cluster:

  • enable_autoscaling: A boolean flag to enable or disable KubeRay's autoscaling capabilities for the worker groups.
  • runtime_env: An optional dictionary or YAML string that specifies the Python environment for the Ray cluster. This can include dependencies, working directory, and environment variables, ensuring all Ray actors and tasks have the necessary code and libraries.
  • address: An optional Ray cluster address. If provided, the task attempts to connect to an existing Ray cluster instead of provisioning a new one.
  • shutdown_after_job_finishes: A boolean flag that controls whether the Ray cluster is torn down immediately after the job completes.
  • ttl_seconds_after_finished: An optional integer specifying the time-to-live (in seconds) for the Ray cluster after the job finishes. This allows for post-job inspection or cleanup before the cluster is deprovisioned.

Runtime Environment Management

The RayFunctionTask handles the initialization of the Ray environment. In a cluster environment, the pre method automatically configures the Ray runtime_env to include the current working directory and excludes common build artifacts. This ensures that the task's code is available to all Ray nodes.

For more complex dependency management or custom code, explicitly define the runtime_env within the RayJobConfig. This allows for precise control over the Python environment, including specifying pip requirements, conda environments, or custom working_dir settings for the Ray cluster.

Integration Details

The RayFunctionTask acts as the bridge between Flyte and KubeRay.

  • The pre method is invoked before the actual task execution. It initializes the Ray client, connecting to an existing cluster if an address is provided, or preparing for a new cluster. When running within a Flyte cluster, it automatically sets up the runtime_env to include the current working directory, ensuring code availability across Ray nodes.
  • The custom_config method translates the RayJobConfig into a KubeRay RayJob object. This involves constructing HeadGroupSpec and WorkerGroupSpec based on the provided configurations, including resource requests/limits and custom pod templates. It also handles the serialization of the runtime_env for KubeRay.

Best Practices and Considerations

  • Resource Allocation: Carefully define requests and limits for both head and worker nodes using HeadNodeConfig and WorkerNodeConfig. This ensures efficient cluster utilization, prevents resource contention, and helps manage costs.
  • Autoscaling: Leverage enable_autoscaling in RayJobConfig along with min_replicas and max_replicas in WorkerNodeConfig to allow the Ray cluster to dynamically scale based on workload demands.
  • Runtime Environment: For tasks with specific Python package requirements or custom modules, explicitly define the runtime_env in RayJobConfig. This guarantees that all Ray actors and tasks have access to the necessary libraries and code.
  • Cluster Lifecycle Management: Configure shutdown_after_job_finishes and ttl_seconds_after_finished to manage the Ray cluster's lifecycle effectively. This helps optimize resource usage and control infrastructure costs by tearing down clusters when no longer needed.
  • Debugging: Utilize Ray's built-in dashboard and logging capabilities for monitoring and debugging distributed applications running on the cluster.
  • KubeRay Version Compatibility: Be aware that runtime_env in RayJobConfig is deprecated in KubeRay versions 1.1.0 and above, replaced by runtime_env_yaml. The integration handles this transition by preferring runtime_env_yaml if available.