Source code for airflow.providers.apache.spark.hooks.spark_pipelines
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import subprocess
import sys
from typing import Any
from airflow.providers.apache.spark.hooks.spark_submit import DEFAULT_SPARK_BINARY, SparkSubmitHook
from airflow.providers.common.compat.sdk import AirflowException, AirflowNotFoundException
[docs]
class SparkPipelinesException(AirflowException):
"""Exception raised when spark-pipelines command fails."""
[docs]
class SparkPipelinesHook(SparkSubmitHook):
"""
Hook for interacting with Spark Declarative Pipelines via the spark-pipelines CLI.
Extends SparkSubmitHook to leverage existing connection management while providing
pipeline-specific functionality.
Two connection modes are supported:
* Legacy spark-submit-style (``spark`` / ``yarn`` / ``k8s`` connection types) —
invokes the ``spark-pipelines`` launcher with ``--master``, ``--deploy-mode``
and the rest of the standard cluster-manager flags assembled by
:class:`~airflow.providers.apache.spark.hooks.spark_submit.SparkSubmitHook`.
* Spark Connect (``spark_connect`` connection type, Spark 4.x+) — sets
``SPARK_REMOTE`` from the connection's ``sc://`` URI and invokes the
Connect-native ``pyspark.pipelines.cli`` Python module directly. The
cluster-manager flags are *not* emitted: the Connect-native CLI rejects
them with ``SparkException: Remote cannot be specified with master and/or
deploy mode``, and the ``spark-pipelines`` bash launcher itself starts a
JVM ``SparkContext`` that collides with the Connect daemon's gRPC port.
:param pipeline_spec: Path to the pipeline specification file (YAML)
:param pipeline_command: The spark-pipelines command to run ('run', 'dry-run')
"""
def __init__(
self,
pipeline_spec: str | None = None,
pipeline_command: str = "run",
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
[docs]
self.pipeline_spec = pipeline_spec
[docs]
self.pipeline_command = pipeline_command
if pipeline_command not in ["run", "dry-run"]:
raise ValueError(f"Invalid pipeline command: {pipeline_command}. Must be 'run' or 'dry-run'")
def _resolve_connection(self) -> dict[str, Any]:
"""
Resolve the configured connection, branching on Spark Connect.
For ``spark_connect``-typed connections, populate a ``spark_remote`` key
with the ``sc://`` URI rendered by
:class:`~airflow.providers.apache.spark.hooks.spark_connect.SparkConnectHook`
and zero out the spark-submit cluster-manager fields — the Connect-native
CLI doesn't consume them. For every other connection type, defer to the
parent's resolver so spark-submit-style behaviour is unchanged.
"""
try:
conn = self.get_connection(self._conn_id)
except AirflowNotFoundException:
# No connection configured — fall through to spark-submit defaults.
return super()._resolve_connection()
if conn.conn_type != "spark_connect":
return super()._resolve_connection()
# Local import: SparkConnectHook lives in the same provider but loading
# it eagerly would create an unnecessary cycle for spark-submit-only
# deployments that never touch Connect.
from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook
return {
# ``master`` is consumed by the parent's ``__init__`` (substring
# checks for ``yarn``/``k8s``/``spark://``); leave it empty so none
# of those branches match. Connect mode never calls
# ``_build_spark_common_args``, which is the only thing that would
# actually emit ``--master`` to the CLI.
"master": "",
"queue": None,
"deploy_mode": None,
"spark_binary": self.spark_binary or DEFAULT_SPARK_BINARY,
"namespace": None,
"principal": self._principal,
"keytab": self._keytab,
"spark_remote": SparkConnectHook(conn_id=self._conn_id).get_connection_url(),
}
def _get_spark_binary_path(self) -> list[str]:
if self._connection.get("spark_remote"):
# The ``spark-pipelines`` bash launcher routes through
# ``spark-class`` → ``SparkSubmit`` → JVM ``SparkContext``, which
# appends cluster-manager flags the Connect-native CLI rejects and
# binds an in-process Connect server that collides with the
# long-running daemon. Invoke the underlying Python module directly
# so ``SparkSession.builder.getOrCreate()`` becomes a Connect client.
return [sys.executable, "-m", "pyspark.pipelines.cli"]
return ["spark-pipelines"]
def _build_spark_pipelines_command(self) -> list[str]:
"""
Construct the spark-pipelines command to execute.
:return: full command to be executed
"""
connection_cmd = self._get_spark_binary_path()
connection_cmd.append(self.pipeline_command)
if self.pipeline_spec:
connection_cmd.extend(["--spec", self.pipeline_spec])
if not self._connection.get("spark_remote"):
# Reuse parent's common spark argument building logic. The
# Connect-native CLI rejects --master/--deploy-mode/--name, so only
# emit them for legacy spark-submit-style connections.
connection_cmd.extend(self._build_spark_common_args())
self.log.info("Spark-Pipelines cmd: %s", self._mask_cmd(connection_cmd))
return connection_cmd
[docs]
def submit_pipeline(self, **kwargs: Any) -> None:
"""
Execute the spark-pipelines command.
:param kwargs: extra arguments to Popen (see subprocess.Popen)
"""
pipelines_cmd = self._build_spark_pipelines_command()
spark_remote = self._connection.get("spark_remote")
# ``self._env`` is only populated by ``_build_spark_common_args`` —
# which the Connect path skips — so fall back to ``self._env_vars``
# (the operator-supplied env_vars kwarg) for that path.
env_overrides: dict[str, str] = dict(self._env or self._env_vars or {})
if spark_remote:
# Don't clobber a SPARK_REMOTE the operator caller already set via
# env_vars; that takes precedence for failover routing.
env_overrides.setdefault("SPARK_REMOTE", spark_remote)
if env_overrides:
env = os.environ.copy()
env.update(env_overrides)
kwargs["env"] = env
self._submit_sp = subprocess.Popen(
pipelines_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1,
universal_newlines=True,
**kwargs,
)
if self._submit_sp.stdout:
self._process_spark_submit_log(iter(self._submit_sp.stdout))
returncode = self._submit_sp.wait()
if returncode:
raise SparkPipelinesException(
f"Cannot execute: {self._mask_cmd(pipelines_cmd)}. Error code is: {returncode}."
)
[docs]
def submit(self, application: str = "", **kwargs: Any) -> None:
"""Override submit to use pipeline-specific logic."""
self.submit_pipeline(**kwargs)