Source code for airflow.providers.edge3.cli.definition

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.cli.cli_config import ARG_PID, ARG_VERBOSE, ActionCommand, Arg, GroupCommand, lazy_load_command
from airflow.configuration import conf

if TYPE_CHECKING:
    import argparse


[docs] ARG_CONCURRENCY = Arg( ("-c", "--concurrency"), type=int, help="The number of worker processes", default=conf.getint("edge", "worker_concurrency", fallback=8), )
[docs] ARG_QUEUES = Arg( ("-q", "--queues"), help="Comma delimited list of queues to serve, serve all queues if not provided.", )
[docs] ARG_EDGE_HOSTNAME = Arg( ("-H", "--edge-hostname"), help="Set the hostname of worker if you have multiple workers on a single machine", )
[docs] ARG_REQUIRED_EDGE_HOSTNAME = Arg( ("-H", "--edge-hostname"), help="Set the hostname of worker if you have multiple workers on a single machine", required=True, )
[docs] ARG_MAINTENANCE = Arg(("maintenance",), help="Desired maintenance state", choices=("on", "off"))
[docs] ARG_MAINTENANCE_COMMENT = Arg( ("-c", "--comments"), help="Maintenance comments to report reason. Required if maintenance is turned on.", )
[docs] ARG_REQUIRED_MAINTENANCE_COMMENT = Arg( ("-c", "--comments"), help="Maintenance comments to report reason. Required if enabling maintenance", required=True, )
[docs] ARG_QUEUES_MANAGE = Arg( ("-q", "--queues"), help="Comma delimited list of queues to add or remove.", required=True, )
[docs] ARG_WAIT_MAINT = Arg( ("-w", "--wait"), default=False, help="Wait until edge worker has reached desired state.", action="store_true", )
[docs] ARG_WAIT_STOP = Arg( ("-w", "--wait"), default=False, help="Wait until edge worker is shut down.", action="store_true", )
[docs] ARG_OUTPUT = Arg( ( "-o", "--output", ), help="Output format. Allowed values: json, yaml, plain, table (default: table)", metavar="(table, json, yaml, plain)", choices=("table", "json", "yaml", "plain"), default="table", )
[docs] ARG_STATE = Arg( ( "-s", "--state", ), nargs="+", help="State of the edge worker", )
[docs] ARG_DAEMON = Arg( ("-D", "--daemon"), help="Daemonize instead of running in the foreground", action="store_true" )
[docs] ARG_UMASK = Arg( ("-u", "--umask"), help="Set the umask of edge worker in daemon mode", )
[docs] ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file if run in daemon mode")
[docs] ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file if run in daemon mode")
[docs] ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file if run in daemon mode")
[docs] ARG_YES = Arg( ("-y", "--yes"), help="Skip confirmation prompt and proceed with shutdown", action="store_true", default=False, )
[docs] EDGE_COMMANDS: list[ActionCommand] = [ ActionCommand( name="worker", help="Start Airflow Edge Worker.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.worker"), args=( ARG_CONCURRENCY, ARG_QUEUES, ARG_EDGE_HOSTNAME, ARG_PID, ARG_VERBOSE, ARG_DAEMON, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE, ARG_UMASK, ), ), ActionCommand( name="status", help="Check for Airflow Local Edge Worker status.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.status"), args=( ARG_PID, ARG_VERBOSE, ), ), ActionCommand( name="maintenance", help="Set or Unset maintenance mode of local edge worker.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.maintenance"), args=( ARG_MAINTENANCE, ARG_MAINTENANCE_COMMENT, ARG_WAIT_MAINT, ARG_PID, ARG_VERBOSE, ), ), ActionCommand( name="stop", help="Stop a running local Airflow Edge Worker.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.stop"), args=( ARG_WAIT_STOP, ARG_PID, ARG_VERBOSE, ), ), ActionCommand( name="list-workers", help="Query the db to list all registered edge workers.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.list_edge_workers"), args=( ARG_OUTPUT, ARG_STATE, ), ), ActionCommand( name="remote-edge-worker-request-maintenance", help="Put remote edge worker on maintenance.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.put_remote_worker_on_maintenance"), args=( ARG_REQUIRED_EDGE_HOSTNAME, ARG_REQUIRED_MAINTENANCE_COMMENT, ), ), ActionCommand( name="remote-edge-worker-exit-maintenance", help="Remove remote edge worker from maintenance.", func=lazy_load_command( "airflow.providers.edge3.cli.edge_command.remove_remote_worker_from_maintenance" ), args=(ARG_REQUIRED_EDGE_HOSTNAME,), ), ActionCommand( name="remote-edge-worker-update-maintenance-comment", help="Update maintenance comments of the remote edge worker.", func=lazy_load_command( "airflow.providers.edge3.cli.edge_command.remote_worker_update_maintenance_comment" ), args=( ARG_REQUIRED_EDGE_HOSTNAME, ARG_REQUIRED_MAINTENANCE_COMMENT, ), ), ActionCommand( name="remove-remote-edge-worker", help="Remove remote edge worker entry from db.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.remove_remote_worker"), args=(ARG_REQUIRED_EDGE_HOSTNAME,), ), ActionCommand( name="shutdown-remote-edge-worker", help="Initiate the shutdown of the remote edge worker.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.remote_worker_request_shutdown"), args=(ARG_REQUIRED_EDGE_HOSTNAME,), ), ActionCommand( name="add-worker-queues", help="Add queues to an edge worker.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.add_worker_queues"), args=( ARG_REQUIRED_EDGE_HOSTNAME, ARG_QUEUES_MANAGE, ), ), ActionCommand( name="remove-worker-queues", help="Remove queues from an edge worker.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.remove_worker_queues"), args=( ARG_REQUIRED_EDGE_HOSTNAME, ARG_QUEUES_MANAGE, ), ), ActionCommand( name="shutdown-all-workers", help="Request graceful shutdown of all edge workers.", func=lazy_load_command("airflow.providers.edge3.cli.edge_command.shutdown_all_workers"), args=(ARG_YES,), ), ]
[docs] def get_edge_cli_commands() -> list[GroupCommand]: return [ GroupCommand( name="edge", help="Edge Worker components", description=( "Start and manage Edge Worker. Works only when using EdgeExecutor. For more information, " "see https://airflow.apache.org/docs/apache-airflow-providers-edge3/stable/edge_executor.html" ), subcommands=EDGE_COMMANDS, ), ]
[docs] def get_parser() -> argparse.ArgumentParser: """ Generate documentation; used by Sphinx. :meta private: """ from airflow.cli.cli_parser import AirflowHelpFormatter, DefaultHelpParser, _add_command parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter) subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND") for group_command in get_edge_cli_commands(): _add_command(subparsers, group_command) return parser

Was this entry helpful?