airflow.providers.apache.spark.operators.spark_submit

Classes

ResumableJobMixin

Airflow 2 stub — no task_state, always submits fresh.

SparkSubmitOperator

Wrap the spark-submit binary to kick off a spark-submit job; requires "spark-submit" binary in the PATH.

Module Contents

class airflow.providers.apache.spark.operators.spark_submit.ResumableJobMixin[source]

Airflow 2 stub — no task_state, always submits fresh.

external_id_key: str = 'remote_job_id'[source]
execute_resumable(context)[source]
class airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator(*, application='', conf=None, conn_id='spark_default', files=None, py_files=None, archives=None, driver_class_path=None, jars=None, java_class=None, packages=None, exclude_packages=None, repositories=None, total_executor_cores=None, executor_cores=None, executor_memory=None, driver_memory=None, keytab=None, principal=None, proxy_user=None, name='arrow-spark', num_executors=None, status_poll_interval=1, application_args=None, env_vars=None, verbose=False, spark_binary=None, properties_file=None, yarn_queue=None, deploy_mode=None, use_krb5ccache=False, post_submit_commands=None, reconnect_on_retry=True, yarn_track_via_rm_api=False, yarn_rm_auth=None, openlineage_inject_parent_job_info=conf.getboolean('openlineage', 'spark_inject_parent_job_info', fallback=False), openlineage_inject_transport_info=conf.getboolean('openlineage', 'spark_inject_transport_info', fallback=False), **kwargs)[source]

Bases: airflow.sdk.bases.resumablemixin.ResumableJobMixin, airflow.providers.common.compat.sdk.BaseOperator

Wrap the spark-submit binary to kick off a spark-submit job; requires “spark-submit” binary in the PATH.

See also

For more information on how to use this operator, take a look at the guide: SparkSubmitOperator

Parameters:
  • application (str) – The application that submitted as a job, either jar or py file. (templated)

  • conf (dict[Any, Any] | None) – Arbitrary Spark configuration properties (templated)

  • conn_id (str) – The spark connection id as configured in Airflow administration. When an invalid connection_id is supplied, it will default to yarn.

  • files (str | None) – Upload additional files to the executor running the job, separated by a comma. Files will be placed in the working directory of each executor. For example, serialized objects. (templated)

  • py_files (str | None) – Additional python files used by the job, can be .zip, .egg or .py. (templated)

  • jars (str | None) – Submit additional jars to upload and place them in driver and executor classpaths. (templated)

  • driver_class_path (str | None) – Additional, driver-specific, classpath settings. (templated)

  • java_class (str | None) – the main class of the Java application

  • packages (str | None) – Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. (templated)

  • exclude_packages (str | None) – Comma-separated list of maven coordinates of jars to exclude while resolving the dependencies provided in ‘packages’ (templated)

  • repositories (str | None) – Comma-separated list of additional remote repositories to search for the maven coordinates given with ‘packages’

  • total_executor_cores (int | None) – (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)

  • executor_cores (int | None) – (Standalone & YARN only) Number of cores per executor (Default: 2)

  • executor_memory (str | None) – Memory per executor (e.g. 1000M, 2G) (Default: 1G)

  • driver_memory (str | None) – Memory allocated to the driver (e.g. 1000M, 2G) (Default: 1G)

  • keytab (str | None) – Full path to the file that contains the keytab (templated) (will overwrite any keytab defined in the connection’s extra JSON)

  • principal (str | None) – The name of the kerberos principal used for keytab (templated) (will overwrite any principal defined in the connection’s extra JSON)

  • proxy_user (str | None) – User to impersonate when submitting the application (templated)

  • name (str) – Name of the job (default airflow-spark). (templated)

  • num_executors (int | None) – Number of executors to launch

  • status_poll_interval (int) – Seconds to wait between polls of driver status in cluster mode. Used both by the Spark standalone driver-status tracker and (when yarn_track_via_rm_api=True) by the YARN ResourceManager REST API polling loop. The YARN ResourceManager REST API polling loop uses at least 10 seconds to avoid flooding the ResourceManager on long-running jobs (Default: 1).

  • application_args (list[Any] | None) – Arguments for the application being submitted (templated)

  • env_vars (dict[str, Any] | None) – Environment variables for spark-submit. It supports yarn and k8s mode too. (templated)

  • verbose (bool) – Whether to pass the verbose flag to spark-submit process for debugging

  • spark_binary (str | None) – The command to use for spark submit. Some distros may use spark2-submit or spark3-submit. (will overwrite any spark_binary defined in the connection’s extra JSON)

  • properties_file (str | None) – Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf.

  • yarn_queue (str | None) – The name of the YARN queue to which the application is submitted. (will overwrite any yarn queue defined in the connection’s extra JSON)

  • deploy_mode (str | None) – Whether to deploy your driver on the worker nodes (cluster) or locally as a client. (will overwrite any deployment mode defined in the connection’s extra JSON)

  • use_krb5ccache (bool) – if True, configure spark to use ticket cache instead of relying on keytab for Kerberos login

  • post_submit_commands (list[str] | None) – Optional list of shell commands to run after the Spark job finishes. Useful for cleaning up sidecars such as Istio. Failures produce a warning but do not fail the task.

  • yarn_track_via_rm_api (bool) – If True (when master is YARN and deploy_mode is cluster), release the spark-submit JVM once the application has been submitted to YARN, then poll the YARN ResourceManager REST API (GET /ws/v1/cluster/apps/{appId}) until the application reaches a final state. The polling interval is controlled by status_poll_interval with a 10-second minimum. This frees the worker from holding the long-lived submit JVM. Requires the Spark connection’s extra JSON to set yarn_resourcemanager_webapp_address (e.g. http://rm:8088). Cluster-side driver logs should be used after the switch to polling. Defaults to False.

  • yarn_rm_auth (requests.auth.AuthBase | None) – Optional requests.auth.AuthBase instance used for every call to the YARN ResourceManager REST API (status polling and kill). When omitted, Kerberos-enabled Spark connections with both keytab and principal configured use requests-kerberos automatically. Defaults to None (no auth for non-Kerberos connections).

external_id_key = 'spark_job_id'[source]
template_fields: collections.abc.Sequence[str] = ('application', 'conf', 'files', 'py_files', 'jars', 'driver_class_path', 'packages',...[source]
application = ''[source]
conf = None[source]
files = None[source]
py_files = None[source]
driver_class_path = None[source]
jars = None[source]
packages = None[source]
exclude_packages = None[source]
keytab = None[source]
principal = None[source]
proxy_user = None[source]
name = 'arrow-spark'[source]
application_args = None[source]
env_vars = None[source]
properties_file = None[source]
post_submit_commands = None[source]
reconnect_on_retry = True[source]
execute(context)[source]

Call the SparkSubmitHook to run the provided spark job.

submit_job(context)[source]

Submit the job to the external system. Return its external ID.

The returned ID must not be None, a None return is treated as “no ID available” and the ID will not be persisted to task state.

get_job_status(external_id)[source]

Query the external system for the current job status.

is_job_active(status)[source]

Return True if the job is still running and can be reconnected to.

status is a raw string returned by the external system — not an Airflow enum. Its values are backend-specific (e.g. "RUNNING", "Pending", "ContainerCreating").

is_job_succeeded(status)[source]

Return True if the job completed successfully.

status is a raw string returned by the external system — not an Airflow enum. Its values are backend-specific (e.g. "FINISHED", "Succeeded").

poll_until_complete(external_id, context)[source]

Block until the job reaches a terminal state. Raise on failure.

get_job_result(external_id, context)[source]

Return the job result after completion. Return None if not applicable.

on_kill()[source]

Override this method to clean up subprocesses when a task instance gets killed.

Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up, or it will leave ghost processes behind.

Was this entry helpful?