Apache Spark Operators¶
Prerequisite¶
To use
SparkSubmitOperatoryou must configure Spark Connection.To use
SparkJDBCOperatoryou must configure both Spark Connection and JDBC connection.SparkSqlOperatorgets all the configurations from operator parameters.To use
PySparkOperatoryou can configure SparkConnect Connection.To use
SparkPipelinesOperatoryou must configure Spark Connection and have thespark-pipelinesCLI available.
SparkJDBCOperator¶
Launches applications on a Apache Spark server, it uses SparkSubmitOperator to perform data transfers to/from JDBC-based databases.
For parameter definition take a look at SparkJDBCOperator.
Using the operator¶
Using cmd_type parameter, is possible to transfer data from Spark to a database (spark_to_jdbc) or from a database to Spark (jdbc_to_spark), which will write the table using the Spark command saveAsTable.
jdbc_to_spark_job = SparkJDBCOperator(
cmd_type="jdbc_to_spark",
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="overwrite",
save_format="JSON",
task_id="jdbc_to_spark_job",
)
spark_to_jdbc_job = SparkJDBCOperator(
cmd_type="spark_to_jdbc",
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="append",
task_id="spark_to_jdbc_job",
)
Reference¶
For further information, look at Apache Spark DataFrameWriter documentation.
PySparkOperator¶
Launches applications on a Apache Spark Connect server or directly in a standalone mode
For parameter definition take a look at PySparkOperator.
Using the operator¶
def my_pyspark_job(spark):
df = spark.range(100).filter("id % 2 = 0")
print(df.count())
spark_pyspark_job = PySparkOperator(
python_callable=my_pyspark_job, conn_id="spark_connect", task_id="spark_pyspark_job"
)
Reference¶
For further information, look at Running the Spark Connect Python.
SparkPipelinesOperator¶
Execute Spark Declarative Pipelines using the spark-pipelines CLI. This operator wraps the spark-pipelines binary to execute declarative data pipelines, supporting both pipeline execution and validation through dry-runs.
For parameter definition take a look at SparkPipelinesOperator.
Using the operator¶
The operator can be used to run declarative pipelines:
from airflow.providers.apache.spark.operators.spark_pipelines import SparkPipelinesOperator
# Execute the pipeline
run_pipeline = SparkPipelinesOperator(
task_id="run_pipeline",
pipeline_spec="/path/to/pipeline.yml",
pipeline_command="run",
conn_id="spark_default",
num_executors=2,
executor_cores=4,
executor_memory="2G",
driver_memory="1G",
)
Pipeline Specification
The pipeline_spec parameter should point to a YAML file defining your declarative pipeline:
name: my_pipeline
storage: file:///path/to/pipeline-storage
libraries:
- glob:
include: transformations/**
Pipeline Commands
run- Execute the pipeline (default)dry-run- Validate the pipeline without execution
Reference¶
For further information, look at Spark Declarative Pipelines Programming Guide.
SparkSqlOperator¶
Launches applications on a Apache Spark server, it requires that the spark-sql script is in the PATH.
The operator will run the SQL query on Spark Hive metastore service, the sql parameter can be templated and be a .sql or .hql file.
For parameter definition take a look at SparkSqlOperator.
Using the operator¶
spark_sql_job = SparkSqlOperator(
sql="SELECT COUNT(1) as cnt FROM temp_table", master="local", task_id="spark_sql_job"
)
Reference¶
For further information, look at Running the Spark SQL CLI.
SparkSubmitOperator¶
Launches applications on a Apache Spark server, it uses the spark-submit script that takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports.
For parameter definition take a look at SparkSubmitOperator.
Using the operator¶
submit_job = SparkSubmitOperator(
application="${SPARK_HOME}/examples/src/main/python/pi.py", task_id="submit_job"
)
Reference¶
For further information, look at Apache Spark submitting applications.
Cluster mode crash recovery (Spark standalone)¶
When running in Spark standalone cluster mode (--deploy-mode cluster), the Spark driver runs
independently on the cluster. If the Airflow worker dies while the Spark job is running, the driver keeps running but
Airflow loses track of it and the behaviour to submit a brand new job would be wasting
the compute already done or even cause conflicts if the Spark job itself is not designed to be idempotent.
Now, the SparkSubmitOperator solves this by persisting the driver ID to task_state immediately after
submission. On retry, it reads the ID back and reconnects to the already-running driver instead of
resubmitting.
This is the synchronous path — the worker holds a slot for the duration of polling. This is a crash-safety net for teams running sync operators for log observability, org constraints, or because a Triggerer is not available. Teams with a Triggerer available may also consider deferrable operators, which free the worker slot but may come with added complexity.
Connection requirements for crash recovery
The reconnection polling calls the Spark standalone REST API
(GET /v1/submissions/status/{driverId}). Make sure the Spark connection’s
REST scheme and REST port extras match your cluster’s configuration:
REST scheme— set tohttpsif your cluster has TLS enabled on the REST port (spark.ssl.standalone.enabled=true). Defaults tohttp.REST port— set to the value ofspark.master.rest.porton your cluster. Defaults to6066.
See Apache Spark Submit Connection for how to configure these fields.
Note
Crash recovery in cluster mode requires Airflow 3.3+ (task_state support). On earlier
versions the operator falls back to the previous behavior of always submitting fresh.
YARN ResourceManager API tracking¶
When running Spark applications on YARN in cluster deploy mode, the default Spark submit path keeps
the local spark-submit JVM alive on the Airflow worker while the YARN
application runs. For long-running Spark applications this can keep worker memory tied up for the
whole application lifetime.
Set yarn_track_via_rm_api=True to release the local spark-submit JVM after YARN accepts the
application, then poll the YARN ResourceManager REST API until the application reaches a terminal
state. The ResourceManager API polling interval is controlled by status_poll_interval with a
minimum of 10 seconds.
This mode requires the Spark connection extra to set yarn_resourcemanager_webapp_address before
the application is submitted:
airflow connections add spark_yarn_rm \
--conn-type spark \
--conn-host yarn \
--conn-extra '{
"deploy-mode": "cluster",
"yarn_resourcemanager_webapp_address": "http://rm.example.com:8088"
}'
SparkSubmitOperator(
task_id="spark_pi",
conn_id="spark_yarn_rm",
application="/path/to/spark-examples.jar",
java_class="org.apache.spark.examples.SparkPi",
deploy_mode="cluster",
yarn_track_via_rm_api=True,
)
For Kerberized clusters, install requests-kerberos in the Airflow environment. When the
Spark connection has both keytab and principal configured, Airflow automatically uses
HTTPKerberosAuth() for the ResourceManager REST requests.
Use yarn_rm_auth only when the ResourceManager needs a custom requests authentication
object:
import requests
SparkSubmitOperator(
task_id="spark_pi",
conn_id="spark_yarn_rm",
application="/path/to/spark-examples.jar",
java_class="org.apache.spark.examples.SparkPi",
deploy_mode="cluster",
yarn_track_via_rm_api=True,
yarn_rm_auth=requests.auth.HTTPBasicAuth("user", "password"),
)