Source code for airflow.providers.edge3.cli.worker

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
import os
import signal
import subprocess
import sys
import tempfile
import time
import traceback
from asyncio import Task, create_task, gather, get_running_loop, sleep
from collections.abc import Awaitable, Callable
from contextlib import suppress
from datetime import datetime
from functools import cached_property
from http import HTTPStatus
from multiprocessing import Process
from pathlib import Path
from typing import IO, TYPE_CHECKING

import anyio
from aiofiles import open as aio_open
from aiohttp import ClientResponseError
from lockfile.pidlockfile import remove_existing_pidfile

from airflow import __version__ as airflow_version
from airflow.providers.common.compat.sdk import conf, timezone
from airflow.providers.edge3 import __version__ as edge_provider_version
from airflow.providers.edge3.cli.api_client import (
    jobs_fetch,
    jobs_set_state,
    logs_push,
    worker_register,
    worker_set_state,
)
from airflow.providers.edge3.cli.dataclasses import Job, MaintenanceMarker, WorkerStatus
from airflow.providers.edge3.cli.signalling import (
    SIG_STATUS,
    maintenance_marker_file_path,
    status_file_path,
    write_pid_to_pidfile,
)
from airflow.providers.edge3.models.edge_worker import (
    EdgeWorkerDuplicateException,
    EdgeWorkerState,
    EdgeWorkerVersionException,
)
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS, AIRFLOW_V_3_3_PLUS
from airflow.utils.net import getfqdn
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
    from airflow.configuration import AirflowConfigParser
    from airflow.executors.workloads import ExecuteTask
    from airflow.providers.edge3.worker_api.datamodels import EdgeJobFetched

[docs] logger = logging.getLogger(__name__)
if sys.platform == "darwin":
[docs] setproctitle = lambda title: logger.debug("Mac OS detected, skipping setproctitle")
else: from setproctitle import setproctitle def _make_task_temp_file(prefix: str) -> tuple[IO[bytes], Path]: """Create a named temporary file for task output capture and return the open file and its path.""" f = tempfile.NamedTemporaryFile(prefix=prefix, suffix=".log", delete=False) return f, Path(f.name) def _edge_hostname() -> str: """Get the hostname of the edge worker that should be reported by tasks.""" return os.environ.get("HOSTNAME", getfqdn()) def _reset_parent_signal_state() -> None: """ Detach a forked child from the parent's asyncio signal plumbing. The parent installs asyncio signal handlers for SIGTERM/SIGINT/SIG_STATUS via ``loop.add_signal_handler``, which internally uses ``signal.set_wakeup_fd`` on one end of a shared socketpair. On Linux ``fork()`` duplicates that fd into the child; signals delivered to the child then write bytes into the socketpair the parent is reading from, re-firing the parent's handlers. Reset the inherited state before the child runs any supervised code. """ signal.set_wakeup_fd(-1) signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL) with suppress(ValueError, OSError): signal.signal(SIG_STATUS, signal.SIG_DFL)
[docs] class EdgeWorker: """Runner instance which executes the Edge Worker."""
[docs] jobs: list[Job] = []
"""List of jobs that the worker is running currently."""
[docs] drain: bool = False
"""Flag if job processing should be completed and no new jobs fetched for a graceful stop/shutdown."""
[docs] drain_started_at: float | None = None
"""``time.monotonic()`` timestamp of when drain was first requested."""
[docs] drain_timed_out: bool = False
[docs] drain_kill_deadline: float | None = None
[docs] maintenance_mode: bool = False
"""Flag if job processing should be completed and no new jobs fetched for maintenance mode. """
[docs] maintenance_comments: str | None = None
"""Comments for maintenance mode."""
[docs] versions_match: bool = True
"""Whether the worker and the server have matching versions of Airflow and the Edge Provider."""
[docs] background_tasks: set[Task] = set()
def __init__( self, pid_file_path: str, hostname: str, queues: list[str] | None, concurrency: int, daemon: bool = False, team_name: str | None = None, ):
[docs] self.pid_file_path = pid_file_path
[docs] self.hostname = hostname
[docs] self.queues = queues
[docs] self.concurrency = concurrency
[docs] self.daemon = daemon
[docs] self.team_name = team_name
[docs] self.worker_start_time: datetime = datetime.now()
if TYPE_CHECKING: self.conf: ExecutorConf | AirflowConfigParser if AIRFLOW_V_3_2_PLUS: from airflow.executors.base_executor import ExecutorConf self.conf = ExecutorConf(team_name) else: self.conf = conf
[docs] self.job_poll_interval = self.conf.getint("edge", "job_poll_interval")
[docs] self.hb_interval = self.conf.getint("edge", "heartbeat_interval")
[docs] self.drain_timeout_sec = self.conf.getint("edge", "drain_timeout_sec")
[docs] self.drain_kill_grace_sec = self.conf.getint("edge", "drain_kill_grace_sec")
[docs] self.base_log_folder: str = ( self.conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE") or "" )
[docs] self.push_logs = self.conf.getboolean("edge", "push_logs")
[docs] self.push_log_chunk_size = self.conf.getint("edge", "push_log_chunk_size")
[docs] self.automatic_maintenance_on = ( self.conf.get("edge", "automatic_maintenance_on", fallback="Off") or "Off" ).lower()
[docs] self.automatic_maintenance_exit = ( self.conf.get("edge", "automatic_maintenance_exit", fallback="Off") or "Off" ).lower()
[docs] self.extended_sysinfo: Callable[[], Awaitable[dict[str, str | int | float | datetime]]] | None = None
extended_sysinfo_func_path = self.conf.get("edge", "extended_system_info_function", fallback=None) if extended_sysinfo_func_path: module_path, func_name = extended_sysinfo_func_path.rsplit(".", 1) try: module = __import__(module_path, fromlist=[func_name]) self.extended_sysinfo = getattr(module, func_name) logger.info("Using extended sysinfo function: %s", extended_sysinfo_func_path) except Exception: logger.exception( "Failed to import extended sysinfo function %s, skipping it.", extended_sysinfo_func_path, ) @cached_property def _execution_api_server_url(self) -> str | None: """Get the execution api server url from config or environment.""" execution_api_server_url = self.conf.get("core", "execution_api_server_url", fallback="") if not execution_api_server_url: # Derive execution api url from edge api url as fallback api_url = self.conf.get("edge", "api_url") execution_api_server_url = ( api_url.replace("edge_worker/v1/rpcapi", "execution") if api_url is not None else None ) logger.info("Using execution api server url: %s", execution_api_server_url) return execution_api_server_url @property
[docs] def free_concurrency(self) -> int: """Calculate the free concurrency of the worker.""" used_concurrency = sum(job.edge_job.concurrency_slots for job in self.jobs) return self.concurrency - used_concurrency
[docs] def signal_status(self): marker_path = Path(maintenance_marker_file_path(None)) if marker_path.exists(): request = MaintenanceMarker.from_json(marker_path.read_text()) logger.info("Requested to set maintenance mode to %s", request.maintenance) self.maintenance_mode = request.maintenance == "on" if self.maintenance_mode and request.comments: logger.info("Comments: %s", request.comments) self.maintenance_comments = request.comments marker_path.unlink() # send heartbeat immediately to update state task = get_running_loop().create_task(self.heartbeat(self.maintenance_comments)) self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) else: logger.info("Request to get status of Edge Worker received.") status_path = Path(status_file_path(None)) status_path.write_text( WorkerStatus( job_count=len(self.jobs), jobs=[job.edge_job.key for job in self.jobs], state=self._get_state(), maintenance=self.maintenance_mode, maintenance_comments=self.maintenance_comments, drain=self.drain, ).json )
[docs] def signal_drain(self): if self._start_draining(): logger.info( "Request to shut down Edge Worker received, waiting for jobs to complete. %s", self._drain_policy_description(), )
[docs] def shutdown_handler(self): if not self._start_draining(): return logger.info("SIGTERM received. Sending SIGTERM to all jobs and quit") try: loop = get_running_loop() except RuntimeError: pass else: task = loop.create_task( self._push_drain_notice_to_all_jobs( "Edge worker received external SIGTERM; terminating task supervisor." ) ) self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) self._terminate_jobs(signal.SIGTERM)
def _start_draining(self) -> bool: """Mark drain start. Returns ``True`` on the first call, ``False`` on subsequent calls.""" if self.drain: return False self.drain = True self.drain_started_at = time.monotonic() return True def _drain_policy_description(self) -> str: """One-line description of the configured drain-timeout policy for logging.""" if self.drain_timeout_sec <= 0: return "Drain timeout disabled; will wait indefinitely." return ( f"Drain timeout: {self.drain_timeout_sec}s (then SIGTERM), " f"kill grace: {self.drain_kill_grace_sec}s (then SIGKILL)." ) def _terminate_jobs(self, sig: int) -> None: """Send ``sig`` to every running job process. Safe to call repeatedly.""" for job in self.jobs: if job.process.pid: with suppress(ProcessLookupError, PermissionError): os.kill(job.process.pid, sig) async def _push_drain_notice_to_all_jobs(self, message: str) -> None: """Best-effort push of ``message`` into each running job's task log stream.""" async def push_one(job: Job) -> None: try: await logs_push( task=job.edge_job.key, log_chunk_time=timezone.utcnow(), log_chunk_data=f"{message}\n", ) except Exception: logger.exception("Failed to push drain notice to task log for %s", job.edge_job.identifier) await gather(*(push_one(job) for job in list(self.jobs))) async def _enforce_drain_timeout(self) -> bool: """ Apply drain-timeout policy when configured. Two-phase escalation: once ``drain_timeout_sec`` elapses, SIGTERM remaining jobs; after ``drain_kill_grace_sec`` more, SIGKILL and return ``True`` so the loop exits. Returns ``False`` otherwise (not configured, not draining, deadline not hit, or waiting out grace). """ if self.drain_timeout_sec <= 0: return False if not self.drain or not self.jobs or self.drain_started_at is None: return False now = time.monotonic() if now - self.drain_started_at < self.drain_timeout_sec: return False if not self.drain_timed_out: self.drain_timed_out = True self.drain_kill_deadline = now + self.drain_kill_grace_sec logger.warning( "Drain timeout of %ds exceeded with %d job(s) still running. Sending SIGTERM.", self.drain_timeout_sec, len(self.jobs), ) await self._push_drain_notice_to_all_jobs( f"Edge worker drain timeout of {self.drain_timeout_sec}s expired; " f"sending SIGTERM to task supervisor. " f"Will escalate to SIGKILL after {self.drain_kill_grace_sec}s grace." ) self._terminate_jobs(signal.SIGTERM) return False if self.drain_kill_deadline is not None and now >= self.drain_kill_deadline: logger.warning( "Drain kill grace of %ds exceeded with %d job(s) still running. Sending SIGKILL and exiting.", self.drain_kill_grace_sec, len(self.jobs), ) await self._push_drain_notice_to_all_jobs( f"Edge worker drain kill-grace of {self.drain_kill_grace_sec}s expired; " f"sending SIGKILL and exiting worker." ) self._terminate_jobs(signal.SIGKILL) return True return False def _adjust_maintenance_mode_based_on_sysinfo( self, sysinfo: dict[str, str | int | float | datetime] ) -> None: """Adjust maintenance mode based on sysinfo status and config.""" status: int = sysinfo.get("status") # type: ignore if not self.maintenance_mode and ( (status >= logging.WARNING and self.automatic_maintenance_on == "warning") or (status >= logging.ERROR and self.automatic_maintenance_on == "error") ): logger.info( "Entering maintenance mode due to status %s in sysinfo.", logging.getLevelName(status) ) self.maintenance_mode = True self.maintenance_comments = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - Automatic maintenance mode entered due to status {logging.getLevelName(status)} in sysinfo." elif ( self.maintenance_mode and self.maintenance_comments and "] - Automatic maintenance mode entered due to status " in self.maintenance_comments and ( (status < logging.WARNING and self.automatic_maintenance_exit == "info") or (status < logging.ERROR and self.automatic_maintenance_exit == "warning") ) ): logger.info("Exiting maintenance mode due to status %s in sysinfo.", logging.getLevelName(status)) self.maintenance_mode = False self.maintenance_comments = None async def _get_sysinfo(self) -> dict[str, str | int | float | datetime]: """Produce the sysinfo from worker to post to central site.""" sysinfo: dict[str, str | int | float | datetime] = { **( { "status": logging.INFO, } if self.versions_match else { "status": logging.WARNING, "status_text": "Healthy but version mismatch", "version_mismatch_description": "The version between the Edge Worker and the " "Airflow Core is not matching for either the edge or airflow package version. " "Please check if the Edge Provider version is compatible with your Airflow " "version. The worker will still operate but you might miss some features or " "have issues. Please consider upgrading the Edge Provider to a compatible " "version.", } ), "airflow_version": airflow_version, "edge_provider_version": edge_provider_version, "python_version": sys.version, "worker_start_time": self.worker_start_time, "concurrency": self.concurrency, "free_concurrency": self.free_concurrency, } if self.extended_sysinfo: try: sysinfo.update(await self.extended_sysinfo()) except Exception: logger.exception("Failed to get extended sysinfo, skipping it.") # After grabbing status, check if we need to enter/exit maintenance mode self._adjust_maintenance_mode_based_on_sysinfo(sysinfo) return sysinfo def _get_state(self) -> EdgeWorkerState: """State of the Edge Worker.""" if self.jobs: if self.drain: return EdgeWorkerState.TERMINATING if self.maintenance_mode: return EdgeWorkerState.MAINTENANCE_PENDING return EdgeWorkerState.RUNNING if self.drain: if self.maintenance_mode: return EdgeWorkerState.OFFLINE_MAINTENANCE return EdgeWorkerState.OFFLINE if self.maintenance_mode: return EdgeWorkerState.MAINTENANCE_MODE return EdgeWorkerState.IDLE def _run_job_via_supervisor(self, workload: ExecuteTask, error_file_path: Path) -> int: """Run a task by calling the supervisor directly (executes inside a forked child process).""" _reset_parent_signal_state() # Ignore ctrl-c in this process -- we don't want to kill _this_ one. we let tasks run to completion os.setpgrp() logger.info("Worker starting up pid=%d", os.getpid()) try: if AIRFLOW_V_3_3_PLUS: from airflow.executors.base_executor import BaseExecutor BaseExecutor.run_workload( workload=workload, server=self._execution_api_server_url, ) else: from airflow.sdk.execution_time.supervisor import supervise ti = workload.ti setproctitle( "airflow edge supervisor: " f"dag_id={ti.dag_id} task_id={ti.task_id} run_id={ti.run_id} map_index={ti.map_index} " f"try_number={ti.try_number}" ) supervise( # This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this. # Same like in airflow/executors/local_executor.py:_execute_workload() ti=ti, # type: ignore[arg-type] dag_rel_path=workload.dag_rel_path, bundle_info=workload.bundle_info, token=workload.token, server=self._execution_api_server_url, log_path=workload.log_path, ) return 0 except Exception: logger.exception("Task execution failed") with suppress(Exception): error_file_path.write_text(traceback.format_exc()) return 1 def _launch_job_subprocess(self, workload: ExecuteTask) -> tuple[subprocess.Popen, Path]: """Launch workload via a fresh Python interpreter (subprocess.Popen).""" env = os.environ.copy() if self._execution_api_server_url: env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] = self._execution_api_server_url # Keep stderr off a PIPE: the worker only inspects stderr after the task finishes, # so a verbose child could otherwise fill the pipe buffer and block forever. Also keep # it task-scoped instead of inheriting the worker's stderr/stdout; supervisor startup # failures should be pushed to the task log, not only the worker/container log. stderr_file, stderr_file_path = _make_task_temp_file("airflow-edge-task-stderr-") try: process = subprocess.Popen( [ sys.executable, "-m", "airflow.sdk.execution_time.execute_workload", "--json-string", workload.model_dump_json(), ], env=env, start_new_session=True, stderr=stderr_file, ) except Exception: stderr_file_path.unlink(missing_ok=True) raise finally: # Close the parent's copy of the fd. Popen already dup2()'d it into the child, # so the child's stderr remains open and writable. The parent reads the output # later via stderr_file_path (the Path) once the child has exited. stderr_file.close() logger.info( "Launched task subprocess pid=%d for %s", process.pid, workload.ti.id, ) return process, stderr_file_path def _launch_job_fork(self, workload: ExecuteTask) -> tuple[Process, Path]: """Launch workload by forking the current process (multiprocessing.Process).""" # Improvement: Use frozen GC to prevent child process from copying unnecessary memory # See _spawn_workers_with_gc_freeze() in airflow-core/src/airflow/executors/local_executor.py error_file, error_file_path = _make_task_temp_file("airflow-edge-task-error-") error_file.close() # child writes to the file by path; parent only reads it after exit process = Process( target=self._run_job_via_supervisor, kwargs={"workload": workload, "error_file_path": error_file_path}, ) process.start() logger.info("Launched task fork pid=%d for %s", process.pid, workload.ti.id) return process, error_file_path def _launch_job(self, edge_job: EdgeJobFetched, workload: ExecuteTask, logfile: Path) -> Job: """ Launch a task process. Uses ``subprocess.Popen`` (fresh interpreter) when ``core.execute_tasks_new_python_interpreter`` is ``True`` or when ``os.fork`` is unavailable (e.g. Windows). Falls back to ``multiprocessing.Process`` (fork) otherwise — preserving the original behaviour for existing deployments. """ use_new_interpreter = not hasattr(os, "fork") or self.conf.getboolean( "core", "execute_tasks_new_python_interpreter", fallback=False, ) if use_new_interpreter: # Fresh subprocess path: spawn a new Python interpreter; no shared memory with parent # Technically safer and more robust, but with more overhead subprocess_process, stderr_file_path = self._launch_job_subprocess(workload) return Job(edge_job, subprocess_process, logfile, stderr_file_path=stderr_file_path) # Fork path: clone the current process; child inherits parent memory fork_process, error_file_path = self._launch_job_fork(workload) return Job(edge_job, fork_process, logfile, stderr_file_path=error_file_path) async def _push_logs_in_chunks(self, job: Job): aio_logfile = anyio.Path(job.logfile) try: if ( self.push_logs and await aio_logfile.exists() and (await aio_logfile.stat()).st_size > job.logsize ): async with aio_open(job.logfile, mode="rb") as logf: await logf.seek(job.logsize, os.SEEK_SET) read_data = await logf.read() job.logsize += len(read_data) # backslashreplace to keep not decoded characters and not raising exception # replace null with question mark to fix issue during DB push log_data = read_data.decode(errors="backslashreplace").replace("\x00", "\ufffd") while True: chunk_data = log_data[: self.push_log_chunk_size] log_data = log_data[self.push_log_chunk_size :] if not chunk_data: break await logs_push( task=job.edge_job.key, log_chunk_time=timezone.utcnow(), log_chunk_data=chunk_data, ) except (FileNotFoundError, OSError): logger.exception("Log file %s vanished while reading, ignoring.", job.logfile) # Swallow the exception; the file may have been removed by log rotation or cleanup while we were reading it. # We'll catch up on the next heartbeat/log push or file was uploaded by log integration in parallel.
[docs] async def start(self): """Start the execution in a loop until terminated.""" try: sysinfo = await self._get_sysinfo() register_result = await worker_register( self.hostname, EdgeWorkerState.MAINTENANCE_MODE if self.maintenance_mode else EdgeWorkerState.STARTING, self.queues, sysinfo, self.team_name, ) self.versions_match = register_result.versions_match except EdgeWorkerVersionException as e: logger.info("Version mismatch of Edge worker and Core. Shutting down worker.") raise SystemExit(str(e)) except EdgeWorkerDuplicateException as e: logger.error(str(e)) raise SystemExit(str(e)) except ClientResponseError as e: # Note: Method not allowed is raised by FastAPI if the API is not enabled (not 404) if e.status in {HTTPStatus.NOT_FOUND, HTTPStatus.METHOD_NOT_ALLOWED}: raise SystemExit( "Error: API endpoint is not ready, please set [edge] api_enabled=True. Or check if the URL is correct to your deployment." ) raise SystemExit(str(e)) if not self.daemon: write_pid_to_pidfile(self.pid_file_path) loop = get_running_loop() loop.add_signal_handler(signal.SIGINT, self.signal_drain) loop.add_signal_handler(SIG_STATUS, self.signal_status) loop.add_signal_handler(signal.SIGTERM, self.shutdown_handler) setproctitle(f"airflow edge worker: {self.hostname}") os.environ["HOSTNAME"] = self.hostname os.environ["AIRFLOW__CORE__HOSTNAME_CALLABLE"] = f"{_edge_hostname.__module__}._edge_hostname" try: await self.loop() logger.info("Quitting worker, signal being offline.") try: sysinfo = await self._get_sysinfo() sysinfo["status"] = logging.NOTSET if "status_text" in sysinfo: del sysinfo["status_text"] # Remove old status text if exists await worker_set_state( self.hostname, EdgeWorkerState.OFFLINE_MAINTENANCE if self.maintenance_mode else EdgeWorkerState.OFFLINE, 0, self.queues, sysinfo, team_name=self.team_name, ) except EdgeWorkerVersionException: logger.info("Version mismatch of Edge worker and Core. Quitting worker anyway.") finally: if not self.daemon: remove_existing_pidfile(self.pid_file_path)
[docs] async def loop(self): """Run a loop of scheduling and monitoring tasks.""" last_hb = datetime.now() worker_state_changed = True # force heartbeat at start previous_jobs = 0 while not self.drain or self.jobs: if ( self.drain or datetime.now().timestamp() - last_hb.timestamp() > self.hb_interval or worker_state_changed # send heartbeat immediately if the state is different in db or previous_jobs != len(self.jobs) # when number of jobs changes ): worker_state_changed = await self.heartbeat() last_hb = datetime.now() previous_jobs = len(self.jobs) if self.maintenance_mode: logger.info("in maintenance mode%s", f", {len(self.jobs)} draining jobs" if self.jobs else "") elif not self.drain and self.free_concurrency > 0: task = create_task(self.fetch_and_run_job()) self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) else: logger.info("%i %s running", len(self.jobs), "job is" if len(self.jobs) == 1 else "jobs are") if await self._enforce_drain_timeout(): break await self.interruptible_sleep()
[docs] async def fetch_and_run_job(self) -> None: """Fetch, start and monitor a new job.""" logger.debug("Attempting to fetch a new job...") edge_job = await jobs_fetch(self.hostname, self.queues, self.free_concurrency, self.team_name) if not edge_job: logger.info( "No new job to process%s", f", {len(self.jobs)} still running" if self.jobs else "", ) return logger.info("Received job: %s", edge_job.identifier) workload: ExecuteTask = edge_job.command if TYPE_CHECKING: assert workload.log_path # We need to assume this is defined in here logfile = Path(self.base_log_folder, workload.log_path) job = self._launch_job(edge_job, workload, logfile) self.jobs.append(job) try: await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING) # As we got one job, directly fetch another one if possible if self.free_concurrency > 0: task = create_task(self.fetch_and_run_job()) self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) while job.is_running: await self._push_logs_in_chunks(job) for _ in range(0, self.job_poll_interval * 10): await sleep(0.1) if not job.is_running: break await self._push_logs_in_chunks(job) if job.is_success: logger.info("Job completed: %s", job.edge_job.identifier) await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS) else: ex_txt = job.failure_details() logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, ex_txt) # Push it upwards to logs for better diagnostic as well await logs_push( task=job.edge_job.key, log_chunk_time=timezone.utcnow(), log_chunk_data=f"Error executing job:\n{ex_txt}", ) await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED) finally: self.jobs.remove(job) # Cleanup temp files used for the job job.cleanup()
[docs] async def heartbeat(self, new_maintenance_comments: str | None = None) -> bool: """Report liveness state of worker to central site with stats.""" sysinfo = await self._get_sysinfo() state = self._get_state() worker_state_changed: bool = False try: worker_info = await worker_set_state( self.hostname, state, len(self.jobs), self.queues, sysinfo, new_maintenance_comments or self.maintenance_comments, team_name=self.team_name, ) self.versions_match = worker_info.versions_match self.queues = worker_info.queues if worker_info.concurrency is not None and worker_info.concurrency != self.concurrency: logger.info( "Concurrency updated from %d to %d by remote request.", self.concurrency, worker_info.concurrency, ) self.concurrency = worker_info.concurrency if worker_info.state == EdgeWorkerState.MAINTENANCE_REQUEST: logger.info("Maintenance mode requested!") self.maintenance_mode = True elif ( worker_info.state in [EdgeWorkerState.IDLE, EdgeWorkerState.RUNNING] and self.maintenance_mode ): logger.info("Exit Maintenance mode requested!") self.maintenance_mode = False if self.maintenance_mode: self.maintenance_comments = worker_info.maintenance_comments else: self.maintenance_comments = None if worker_info.state == EdgeWorkerState.SHUTDOWN_REQUEST and self._start_draining(): logger.info("Shutdown requested!") worker_state_changed = worker_info.state != state except EdgeWorkerVersionException: logger.info("Version mismatch of Edge worker and Core. Shutting down worker.") self._start_draining() return worker_state_changed
[docs] async def interruptible_sleep(self): """Sleeps but stops sleeping if drain is made or some job completed.""" drain_before_sleep = self.drain jobcount_before_sleep = len(self.jobs) for _ in range(0, self.job_poll_interval * 10): await sleep(0.1) if drain_before_sleep != self.drain or len(self.jobs) < jobcount_before_sleep: return

Was this entry helpful?