Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Source code for airflow.providers.edge3.cli.signalling

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
import os
import signal
import sys
from pathlib import Path

import psutil
from lockfile.pidlockfile import (
    read_pid_from_pidfile,
    remove_existing_pidfile,
    write_pid_to_pidfile as write_pid,
)

from airflow.utils import cli as cli_utils
from airflow.utils.platform import IS_WINDOWS

[docs] EDGE_WORKER_PROCESS_NAME = "edge-worker"
[docs] logger = logging.getLogger(__name__)
def _status_signal() -> signal.Signals: if IS_WINDOWS: return signal.SIGBREAK # type: ignore[attr-defined] return signal.SIGUSR2
[docs] SIG_STATUS = _status_signal()
[docs] def write_pid_to_pidfile(pid_file_path: str): """Write PIDs for Edge Workers to disk, handling existing PID files.""" if Path(pid_file_path).exists(): # Handle existing PID files on disk logger.info("An existing PID file has been found: %s.", pid_file_path) pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path) if os.getpid() == pid_stored_in_pid_file: raise SystemExit("A PID file has already been written") # PID file was written by dead or already running instance if psutil.pid_exists(pid_stored_in_pid_file): # case 1: another instance uses the same path for its PID file raise SystemExit( f"The PID file {pid_file_path} contains the PID of another running process. " "Configuration issue: edge worker instance must use different PID file paths!" ) # case 2: previous instance crashed without cleaning up its PID file logger.warning("PID file is orphaned. Cleaning up.") remove_existing_pidfile(pid_file_path) logger.debug("PID file written to %s.", pid_file_path) write_pid(pid_file_path)
[docs] def pid_file_path(pid_file: str | None) -> str: return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[0]
[docs] def get_pid(pid_file: str | None) -> int: pid = read_pid_from_pidfile(pid_file_path(pid_file)) if not pid: logger.warning("Could not find PID of worker.") sys.exit(1) return pid
[docs] def status_file_path(pid_file: str | None) -> str: return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[1]
[docs] def maintenance_marker_file_path(pid_file: str | None) -> str: return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[1][:-4] + ".in"

Was this entry helpful?