Edge Executor¶
Note
The Edge Provider Package is an experimental preview. Features and stability is limited and needs to be improved over time. Target is to have full support in Airflow 3. Once Airflow 3 support contains Edge Provider, maintenance of the Airflow 2 package will be dis-continued.
Note
As of Airflow 2.10.3, the edge
provider package is not included in normal release cycle.
Thus you can not directly install it via: pip install 'apache-airflow[edge]'
as the dependency
can not be downloaded.
While it is in not-ready state, a wheel release package must be manually built from source tree
via breeze release-management prepare-provider-packages --include-not-ready-providers edge
and then installed via pip or uv from the generated wheel file. like:
pip install apache_airflow_providers_edge-<version>-py3-none-any.whl
.
EdgeExecutor
is an option if you want to distribute tasks to workers distributed in different locations.
You can use it also in parallel with other executors if needed. Change your airflow.cfg
to point
the executor parameter to EdgeExecutor
and provide the related settings.
The configuration parameters of the Edge Executor can be found in the Edge provider’s Configuration Reference.
Here are a few imperative requirements for your workers:
airflow
needs to be installed, and the airflow CLI needs to be in the pathAirflow configuration settings should be homogeneous across the cluster
Operators that are executed on the Edge Worker need to have their dependencies met in that context. Please take a look to the respective provider package documentations
The worker needs to have access to its
DAGS_FOLDER
, and you need to synchronize the filesystems by your own means. A common setup would be to store yourDAGS_FOLDER
in a Git repository and sync it across machines using Chef, Puppet, Ansible, or whatever you use to configure machines in your environment. If all your boxes have a common mount point, having your pipelines files shared there should work as well
Minimum configuration for the Edge Worker to make it running is:
Section
[core]
executor
: Executor must be set or added to beairflow.providers.edge.executors.EdgeExecutor
internal_api_secret_key
: An encryption key must be set on webserver and Edge Worker component as shared secret to authenticate traffic. It should be a random string like the fernet key (but preferably not the same).
Section
[edge]
api_enabled
: Must be set to true. It is disabled intentionally to not expose the endpoint by default. This is the endpoint the worker connects to. In a future release a dedicated API server can be started.api_url
: Must be set to the URL which exposes the web endpoint
To kick off a worker, you need to setup Airflow and kick off the worker subcommand
airflow edge worker
Your worker should start picking up tasks as soon as they get fired in its direction. To stop a worker running on a machine you can use:
airflow edge stop
It will try to stop the worker gracefully by sending SIGINT
signal to main
process as and wait until all running tasks are completed.
If you want to monitor the remote activity and worker, use the UI plugin which is included in the provider package and install it on the webserver and use the “Admin” - “Edge Worker Hosts” and “Edge Worker Jobs” pages.
Some caveats:
Tasks can consume resources. Make sure your worker has enough resources to run
worker_concurrency
tasksMake sure that the
pool_slots
of a Tasks matches with theworker_concurrency
of the workerQueue names are limited to 256 characters
See Modules Management for details on how Python and Airflow manage modules.
Limitations of Pre-Release¶
As this provider package is an experimental preview not all functions are support and not fully covered. If you plan to use the Edge Executor / Worker in the current stage you need to ensure you test properly before use. The following features have been initially tested and are working:
Some core operators
BashOperator
PythonOperator
@task
decorator@task.branch
decorator@task.virtualenv
decorator@task.bash
decoratorDynamic Mapped Tasks
XCom read/write
Variable and Connection access
Setup and Teardown tasks
Some known limitations
Tasks that require DB access will fail - no DB connection from remote site is possible
This also means that some direct Airflow API via Python is not possible (e.g. airflow.models.*)
Log upload will only work if you use a single web server instance or they need to share one log file volume.
Performance: No performance assessment and scaling tests have been made. The edge executor package is not optimized for scalability. This will need to be considered in future releases. A dedicated performance assessment is to be completed ensuring that in a hybrid setup other executors are not impacted before version 1.0.0 is to be released.
Stuck tasks in queue are not explicitly handled as
cleanup_stuck_queued_tasks()
is not implemented.
Architecture¶
Airflow consist of several components:
Workers - Execute the assigned tasks - most standard setup has local or centralized workers, e.g. via Celery
Edge Workers - Special workers which pull tasks via HTTP as provided as feature via this provider package
Scheduler - Responsible for adding the necessary tasks to the queue
Web server - HTTP Server provides access to DAG/task status information
Database - Contains information about the status of tasks, DAGs, Variables, connections, etc.
Queues¶
When using the EdgeExecutor, the workers that tasks are sent to
can be specified. queue
is an attribute of BaseOperator, so any
task can be assigned to any queue. The default queue for the environment
is defined in the airflow.cfg
’s operators -> default_queue
. This defines
the queue that tasks get assigned to when not specified, as well as which
queue Airflow workers listen to when started.
Workers can listen to one or multiple queues of tasks. When a worker is
started (using command airflow edge worker
), a set of comma-delimited queue
names (with no whitespace) can be given (e.g. airflow edge worker -q remote,wisconsin_site
).
This worker will then only pick up tasks wired to the specified queue(s).
This can be useful if you need specialized workers, either from a resource perspective (for say very lightweight tasks where one worker could take thousands of tasks without a problem), or from an environment perspective (you want a worker running from a specific location where required infrastructure is available).
Concurrency slot handling¶
Some tasks may need more resources than other tasks, to handle these use case the Edge worker supports
concurrency slot handling. The logic behind this is the same as the pool slot feature
see Pools.
Edge worker reuses pool_slots
of task_instance to keep number if task instance parameter as low as possible.
The pool_slots
value works together with the worker_concurrency
value which is defined during start of worker.
If a task needs more resources, the pool_slots
value can be increased to reduce number of tasks running in parallel.
The value can be used to block other tasks from being executed in parallel on the same worker.
A pool_slots
of 2 and a worker_concurrency
of 3 means
that a worker which executes this task can only execute a job with a pool_slots
of 1 in parallel.
If no pool_slots
is defined for a task the default value is 1. The pool_slots
value only supports
integer values.
Here is an example setting pool_slots for a task:
import os
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME
with DAG(
dag_id="example_edge_pool_slots",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task(executor="EdgeExecutor", pool_slots=2)
def task_with_template():
print_stuff()
task_with_template()
Feature Backlog of MVP to Release Readiness¶
As noted above the current version of the EdgeExecutor is a MVP (Minimum Viable Product). It can be used but must be taken with care if you want to use it productively. Just the bare minimum functions are provided currently and missing features will be added over time.
The target implementation is sketched in AIP-69 (Airflow Improvement Proposal for Edge Executor) and this AIP will be completed when open features are implemented and it has production grade stability.
The following features are known missing and will be implemented in increments:
API token per worker: Today there is a global API token available only
Edge Worker Plugin
Overview about queues / jobs per queue
Allow starting Edge Worker REST API separate to webserver
Administrative maintenance / temporary disable jobs on worker
Edge Worker CLI
Use WebSockets instead of HTTP calls for communication
Send logs also to TaskFileHandler if external logging services are used
Integration into telemetry to send metrics from remote site
Allow
airflow edge stop
to wait until completed to terminatedPublish system metrics with heartbeats (CPU, Disk space, RAM, Load)
Be more liberal e.g. on patch version. MVP requires exact version match (In current state if versions do not match, the worker will gracefully shut down when jobs are completed, no new jobs will be started)
Tests
Integration tests in Github
Test/Support on Windows for Edge Worker
Scaling test - Check and define boundaries of workers/jobs
Load tests - impact of scaled execution and code optimization
Airflow 3 / AIP-72 Migration
Thin deployment based on Task SDK
DAG Code push (no need to GIT Sync)
Implicit with AIP-72: Move task context generation from Remote to Executor
Documentation
Describe more details on deployment options and tuning
Provide scripts and guides to install edge components as service (systemd)
Extend Helm-Chart for needed support