Skip to main content

Spark Integration

Spark Integration enables the execution of PySpark functions as distributed tasks, leveraging Kubernetes for native Spark job orchestration. This integration transforms standard Python code into a robust, scalable Spark application, managing the underlying infrastructure automatically.

Core Concepts

The integration revolves around two primary components: the PysparkFunctionTask and the Spark configuration object.

PysparkFunctionTask

The PysparkFunctionTask is the core plugin responsible for orchestrating Spark jobs. It acts as an adapter, taking a Python function and preparing it for execution within a Spark context. This involves:

  • Spark Session Management: Automatically initializes and stops a SparkSession for the task.
  • Code Distribution: In a cluster environment, it packages the local Python code and its dependencies into a zip file, distributing it to Spark executors to ensure all necessary modules are available.
  • Job Configuration: Translates the user-defined Spark configuration into a SparkJob definition, which is then submitted to the Kubernetes cluster.

Spark Configuration Object

The Spark object (from flyteplugins.spark.task) provides a declarative way to configure the Spark environment for a task. When defining a task that requires Spark, an instance of this object is passed to specify the desired Spark runtime settings.

The Spark object includes the following attributes:

  • spark_conf (Optional[Dict[str, str]]): A dictionary for setting Spark properties. These properties directly influence the Spark application's behavior, such as spark.executor.memory or spark.driver.cores.
  • hadoop_conf (Optional[Dict[str, str]]): A dictionary for setting Hadoop properties. This is useful for configuring access to distributed file systems like HDFS or S3 (e.g., fs.s3a.access.key).
  • executor_path (Optional[str]): Specifies the path to the Python binary used by PySpark executors. This is crucial for ensuring consistent Python environments across the cluster.
  • applications_path (Optional[str]): Defines the path to the main application file. While often automatically inferred, this allows specifying a custom entry point, especially for pre-compiled JARs or specific Python scripts.
  • driver_pod (Optional[PodTemplate]): A PodTemplate object for customizing the Kubernetes Pod that hosts the Spark driver. This enables fine-grained control over driver resources, environment variables, and node affinities.
  • executor_pod (Optional[PodTemplate]): A PodTemplate object for customizing the Kubernetes Pods that host the Spark executors. This allows tailoring executor resources and configurations to specific workload requirements.

Task Execution Lifecycle

When a task is configured for Spark integration, the PysparkFunctionTask manages its lifecycle through distinct phases:

  1. Pre-execution (pre method):

    • A pyspark.sql.SparkSession is initialized with a default application name.
    • If the task executes within a cluster, the current working directory is archived into a zip file. This archive is then distributed to all Spark executors using sparkContext.addPyFile(), making local Python modules and dependencies available across the distributed environment.
    • The initialized SparkSession is made available to the user's Python function.
  2. Configuration Generation (custom_config method):

    • The Spark configuration object associated with the task is translated into a SparkJob definition.
    • sparkConf and hadoopConf are populated directly from the Spark object's attributes.
    • The mainApplicationFile is set, defaulting to the task's entrypoint path if not explicitly provided in the Spark configuration.
    • The executorPath is determined similarly.
    • The applicationType is explicitly set to PYTHON.
    • Custom driver_pod and executor_pod templates, if provided, are converted into Kubernetes Pod specifications for the Spark driver and executor containers.
  3. Post-execution (post method):

    • The SparkSession created during the pre-execution phase is gracefully stopped, releasing cluster resources.

Key Capabilities and Usage

Spark Integration provides a powerful way to run distributed data processing workloads.

Automated Spark Session Management

Developers do not need to manually create or manage SparkSession objects within their task code. The integration automatically provides a configured SparkSession to the task function, simplifying development.

Dependency Distribution

The automatic packaging and distribution of local code ensure that any custom modules or libraries defined alongside the task are available to all Spark executors, facilitating complex, multi-file Spark applications.

Granular Resource Control

The driver_pod and executor_pod attributes of the Spark object offer extensive control over the underlying Kubernetes resources. This allows for precise tuning of CPU, memory, storage, and other pod-level configurations, optimizing performance and cost for diverse Spark workloads.

Flexible Configuration

The spark_conf and hadoop_conf dictionaries allow overriding any default Spark or Hadoop settings, enabling tasks to adapt to specific cluster environments or performance requirements.

Example Implementation

To integrate Spark into a task, define a Spark configuration object and associate it with your Python function.

import pyspark.sql
# Assuming Spark is imported from the relevant plugin package
from flyteplugins.spark.task import Spark # This is the configuration object

# Define a Spark configuration
my_spark_config = Spark(
spark_conf={
"spark.executor.memory": "2g",
"spark.driver.memory": "4g",
"spark.sql.shuffle.partitions": "200",
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "10",
},
hadoop_conf={
"fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"fs.s3a.endpoint": "s3.amazonaws.com",
},
# Optional: Customize driver and executor pods
# driver_pod=PodTemplate(
# resources=Resources(cpu="1", mem="4Gi"),
# node_selector={"node-group": "spark-drivers"},
# ),
# executor_pod=PodTemplate(
# resources=Resources(cpu="2", mem="8Gi"),
# tolerations=[Toleration(key="dedicated", value="spark", effect="NoSchedule")],
# ),
)

# Define a Python function that will run as a Spark task
# The system automatically injects the configured SparkSession
# @task(task_config=my_spark_config) # This is how the configuration would be applied
def process_data_with_spark(spark_session: pyspark.sql.SparkSession, input_path: str) -> float:
"""
Reads data from an S3 path, processes it using Spark, and returns a result.
"""
df = spark_session.read.parquet(input_path)
processed_df = df.groupBy("category").count()
total_count = processed_df.count()
return float(total_count)

# When this task executes, it will launch a distributed Spark job
# on Kubernetes using the settings defined in my_spark_config.

Performance Considerations

  • Resource Allocation: Carefully configure spark_conf, driver_pod, and executor_pod to match the computational and memory requirements of your Spark workload. Over-provisioning wastes resources, while under-provisioning leads to slow execution or failures.
  • Data Locality: Optimize data access patterns. When reading data from distributed storage (e.g., S3), ensure Spark executors are co-located with the data or that network bandwidth is sufficient.
  • Shuffle Operations: Minimize data shuffling across the network by optimizing Spark transformations. Tune spark.sql.shuffle.partitions to prevent excessive small partitions or too few large ones.

Limitations and Considerations

  • External Dependencies: While local Python code is automatically distributed, complex external dependencies (e.g., specific versions of non-PyPI libraries, native binaries) might require building custom Docker images for the Spark driver and executors.
  • Spark Version Compatibility: Ensure that the PySpark version used in your local development environment is compatible with the Spark version deployed in the cluster.
  • Kubernetes Resource Quotas: Spark jobs consume significant Kubernetes resources. Be aware of cluster resource quotas and limits to prevent job failures or resource contention.