Source code for airflow.providers.apache.spark.hooks.spark_pipelines
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import subprocess
from typing import Any
from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
from airflow.providers.common.compat.sdk import AirflowException
[docs]
class SparkPipelinesException(AirflowException):
"""Exception raised when spark-pipelines command fails."""
[docs]
class SparkPipelinesHook(SparkSubmitHook):
"""
Hook for interacting with Spark Declarative Pipelines via the spark-pipelines CLI.
Extends SparkSubmitHook to leverage existing connection management while providing
pipeline-specific functionality.
:param pipeline_spec: Path to the pipeline specification file (YAML)
:param pipeline_command: The spark-pipelines command to run ('run', 'dry-run')
"""
def __init__(
self,
pipeline_spec: str | None = None,
pipeline_command: str = "run",
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
[docs]
self.pipeline_spec = pipeline_spec
[docs]
self.pipeline_command = pipeline_command
if pipeline_command not in ["run", "dry-run"]:
raise ValueError(f"Invalid pipeline command: {pipeline_command}. Must be 'run' or 'dry-run'")
def _get_spark_binary_path(self) -> list[str]:
return ["spark-pipelines"]
def _build_spark_pipelines_command(self) -> list[str]:
"""
Construct the spark-pipelines command to execute.
:return: full command to be executed
"""
# Start with spark-pipelines binary and command
connection_cmd = self._get_spark_binary_path()
connection_cmd.append(self.pipeline_command)
# Add pipeline spec if provided
if self.pipeline_spec:
connection_cmd.extend(["--spec", self.pipeline_spec])
# Reuse parent's common spark argument building logic
connection_cmd.extend(self._build_spark_common_args())
self.log.info("Spark-Pipelines cmd: %s", self._mask_cmd(connection_cmd))
return connection_cmd
[docs]
def submit_pipeline(self, **kwargs: Any) -> None:
"""
Execute the spark-pipelines command.
:param kwargs: extra arguments to Popen (see subprocess.Popen)
"""
pipelines_cmd = self._build_spark_pipelines_command()
if self._env:
import os
env = os.environ.copy()
env.update(self._env)
kwargs["env"] = env
self._submit_sp = subprocess.Popen(
pipelines_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1,
universal_newlines=True,
**kwargs,
)
if self._submit_sp.stdout:
self._process_spark_submit_log(iter(self._submit_sp.stdout))
returncode = self._submit_sp.wait()
if returncode:
raise SparkPipelinesException(
f"Cannot execute: {self._mask_cmd(pipelines_cmd)}. Error code is: {returncode}."
)
[docs]
def submit(self, application: str = "", **kwargs: Any) -> None:
"""Override submit to use pipeline-specific logic."""
self.submit_pipeline(**kwargs)