Spark Integration
Spark Integration enables the execution of PySpark functions as distributed tasks, leveraging Kubernetes for native Spark job orchestration. This integration transforms standard Python code into a robust, scalable Spark application, managing the underlying infrastructure automatically.
Core Concepts
The integration revolves around two primary components: the PysparkFunctionTask and the Spark configuration object.
PysparkFunctionTask
The PysparkFunctionTask is the core plugin responsible for orchestrating Spark jobs. It acts as an adapter, taking a Python function and preparing it for execution within a Spark context. This involves:
- Spark Session Management: Automatically initializes and stops a
SparkSessionfor the task. - Code Distribution: In a cluster environment, it packages the local Python code and its dependencies into a zip file, distributing it to Spark executors to ensure all necessary modules are available.
- Job Configuration: Translates the user-defined
Sparkconfiguration into aSparkJobdefinition, which is then submitted to the Kubernetes cluster.
Spark Configuration Object
The Spark object (from flyteplugins.spark.task) provides a declarative way to configure the Spark environment for a task. When defining a task that requires Spark, an instance of this object is passed to specify the desired Spark runtime settings.
The Spark object includes the following attributes:
spark_conf(Optional[Dict[str, str]]): A dictionary for setting Spark properties. These properties directly influence the Spark application's behavior, such asspark.executor.memoryorspark.driver.cores.hadoop_conf(Optional[Dict[str, str]]): A dictionary for setting Hadoop properties. This is useful for configuring access to distributed file systems like HDFS or S3 (e.g.,fs.s3a.access.key).executor_path(Optional[str]): Specifies the path to the Python binary used by PySpark executors. This is crucial for ensuring consistent Python environments across the cluster.applications_path(Optional[str]): Defines the path to the main application file. While often automatically inferred, this allows specifying a custom entry point, especially for pre-compiled JARs or specific Python scripts.driver_pod(Optional[PodTemplate]): APodTemplateobject for customizing the Kubernetes Pod that hosts the Spark driver. This enables fine-grained control over driver resources, environment variables, and node affinities.executor_pod(Optional[PodTemplate]): APodTemplateobject for customizing the Kubernetes Pods that host the Spark executors. This allows tailoring executor resources and configurations to specific workload requirements.
Task Execution Lifecycle
When a task is configured for Spark integration, the PysparkFunctionTask manages its lifecycle through distinct phases:
-
Pre-execution (
premethod):- A
pyspark.sql.SparkSessionis initialized with a default application name. - If the task executes within a cluster, the current working directory is archived into a zip file. This archive is then distributed to all Spark executors using
sparkContext.addPyFile(), making local Python modules and dependencies available across the distributed environment. - The initialized
SparkSessionis made available to the user's Python function.
- A
-
Configuration Generation (
custom_configmethod):- The
Sparkconfiguration object associated with the task is translated into aSparkJobdefinition. sparkConfandhadoopConfare populated directly from theSparkobject's attributes.- The
mainApplicationFileis set, defaulting to the task's entrypoint path if not explicitly provided in theSparkconfiguration. - The
executorPathis determined similarly. - The
applicationTypeis explicitly set toPYTHON. - Custom
driver_podandexecutor_podtemplates, if provided, are converted into Kubernetes Pod specifications for the Spark driver and executor containers.
- The
-
Post-execution (
postmethod):- The
SparkSessioncreated during the pre-execution phase is gracefully stopped, releasing cluster resources.
- The
Key Capabilities and Usage
Spark Integration provides a powerful way to run distributed data processing workloads.
Automated Spark Session Management
Developers do not need to manually create or manage SparkSession objects within their task code. The integration automatically provides a configured SparkSession to the task function, simplifying development.
Dependency Distribution
The automatic packaging and distribution of local code ensure that any custom modules or libraries defined alongside the task are available to all Spark executors, facilitating complex, multi-file Spark applications.
Granular Resource Control
The driver_pod and executor_pod attributes of the Spark object offer extensive control over the underlying Kubernetes resources. This allows for precise tuning of CPU, memory, storage, and other pod-level configurations, optimizing performance and cost for diverse Spark workloads.
Flexible Configuration
The spark_conf and hadoop_conf dictionaries allow overriding any default Spark or Hadoop settings, enabling tasks to adapt to specific cluster environments or performance requirements.
Example Implementation
To integrate Spark into a task, define a Spark configuration object and associate it with your Python function.
import pyspark.sql
# Assuming Spark is imported from the relevant plugin package
from flyteplugins.spark.task import Spark # This is the configuration object
# Define a Spark configuration
my_spark_config = Spark(
spark_conf={
"spark.executor.memory": "2g",
"spark.driver.memory": "4g",
"spark.sql.shuffle.partitions": "200",
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "10",
},
hadoop_conf={
"fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"fs.s3a.endpoint": "s3.amazonaws.com",
},
# Optional: Customize driver and executor pods
# driver_pod=PodTemplate(
# resources=Resources(cpu="1", mem="4Gi"),
# node_selector={"node-group": "spark-drivers"},
# ),
# executor_pod=PodTemplate(
# resources=Resources(cpu="2", mem="8Gi"),
# tolerations=[Toleration(key="dedicated", value="spark", effect="NoSchedule")],
# ),
)
# Define a Python function that will run as a Spark task
# The system automatically injects the configured SparkSession
# @task(task_config=my_spark_config) # This is how the configuration would be applied
def process_data_with_spark(spark_session: pyspark.sql.SparkSession, input_path: str) -> float:
"""
Reads data from an S3 path, processes it using Spark, and returns a result.
"""
df = spark_session.read.parquet(input_path)
processed_df = df.groupBy("category").count()
total_count = processed_df.count()
return float(total_count)
# When this task executes, it will launch a distributed Spark job
# on Kubernetes using the settings defined in my_spark_config.
Performance Considerations
- Resource Allocation: Carefully configure
spark_conf,driver_pod, andexecutor_podto match the computational and memory requirements of your Spark workload. Over-provisioning wastes resources, while under-provisioning leads to slow execution or failures. - Data Locality: Optimize data access patterns. When reading data from distributed storage (e.g., S3), ensure Spark executors are co-located with the data or that network bandwidth is sufficient.
- Shuffle Operations: Minimize data shuffling across the network by optimizing Spark transformations. Tune
spark.sql.shuffle.partitionsto prevent excessive small partitions or too few large ones.
Limitations and Considerations
- External Dependencies: While local Python code is automatically distributed, complex external dependencies (e.g., specific versions of non-PyPI libraries, native binaries) might require building custom Docker images for the Spark driver and executors.
- Spark Version Compatibility: Ensure that the PySpark version used in your local development environment is compatible with the Spark version deployed in the cluster.
- Kubernetes Resource Quotas: Spark jobs consume significant Kubernetes resources. Be aware of cluster resource quotas and limits to prevent job failures or resource contention.