Source code for airflow.providers.edge3.cli.signalling
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
import os
import signal
import sys
from pathlib import Path
import psutil
from lockfile.pidlockfile import (
read_pid_from_pidfile,
remove_existing_pidfile,
write_pid_to_pidfile as write_pid,
)
from airflow.utils import cli as cli_utils
from airflow.utils.platform import IS_WINDOWS
[docs]
EDGE_WORKER_PROCESS_NAME = "edge-worker"
[docs]
logger = logging.getLogger(__name__)
def _status_signal() -> signal.Signals:
if IS_WINDOWS:
return signal.SIGBREAK # type: ignore[attr-defined]
return signal.SIGUSR2
[docs]
SIG_STATUS = _status_signal()
[docs]
def write_pid_to_pidfile(pid_file_path: str):
"""Write PIDs for Edge Workers to disk, handling existing PID files."""
if Path(pid_file_path).exists():
# Handle existing PID files on disk
logger.info("An existing PID file has been found: %s.", pid_file_path)
pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path)
if os.getpid() == pid_stored_in_pid_file:
raise SystemExit("A PID file has already been written")
# PID file was written by dead or already running instance
if psutil.pid_exists(pid_stored_in_pid_file):
# case 1: another instance uses the same path for its PID file
raise SystemExit(
f"The PID file {pid_file_path} contains the PID of another running process. "
"Configuration issue: edge worker instance must use different PID file paths!"
)
# case 2: previous instance crashed without cleaning up its PID file
logger.warning("PID file is orphaned. Cleaning up.")
remove_existing_pidfile(pid_file_path)
logger.debug("PID file written to %s.", pid_file_path)
write_pid(pid_file_path)
[docs]
def pid_file_path(pid_file: str | None) -> str:
return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[0]
[docs]
def get_pid(pid_file: str | None) -> int:
pid = read_pid_from_pidfile(pid_file_path(pid_file))
if not pid:
logger.warning("Could not find PID of worker.")
sys.exit(1)
return pid
[docs]
def status_file_path(pid_file: str | None) -> str:
return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[1]
[docs]
def maintenance_marker_file_path(pid_file: str | None) -> str:
return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[1][:-4] + ".in"