HTTP Event Trigger¶
The HttpEventTrigger is an event-based trigger that monitors whether responses
from an API meet the conditions set by the user in the response_check callable.
It is designed for Airflow 3.0+ to be used in combination with the AssetWatcher system,
enabling event-driven DAGs based on API responses.
How It Works¶
Sends requests to an API.
Uses the callable at
response_check_pathto evaluate the API response.If the callable returns
True, aTriggerEventis emitted. This will trigger DAGs using thisAssetWatcherfor scheduling.
Note
This trigger requires Airflow >= 3.0 due to dependencies on AssetWatcher and event-driven scheduling infrastructure.
Usage Example with AssetWatcher¶
Here’s an example of using the HttpEventTrigger in an AssetWatcher to monitor the GitHub API for new Airflow releases.
import datetime
import os
from asgiref.sync import sync_to_async
from airflow.providers.http.triggers.http import HttpEventTrigger
from airflow.sdk import Asset, AssetWatcher, Variable, dag, task
# This token must be generated through GitHub and added as an environment variable
token = os.getenv("GITHUB_TOKEN")
headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": "2022-11-28",
}
async def check_github_api_response(response):
data = response.json()
release_id = str(data["id"])
get_variable_sync = sync_to_async(Variable.get)
previous_release_id = await get_variable_sync(key="release_id_var", default=None)
if release_id == previous_release_id:
return False
release_name = data["name"]
release_html_url = data["html_url"]
set_variable_sync = sync_to_async(Variable.set)
await set_variable_sync(key="release_id_var", value=str(release_id))
await set_variable_sync(key="release_name_var", value=release_name)
await set_variable_sync(key="release_html_url_var", value=release_html_url)
return True
trigger = HttpEventTrigger(
endpoint="repos/apache/airflow/releases/latest",
method="GET",
http_conn_id="http_default", # HTTP connection with https://api.github.com/ as the Host
headers=headers,
response_check_path="dags.check_airflow_releases.check_github_api_response", # Path to the check_github_api_response callable
)
asset = Asset(
"airflow_releases_asset", watchers=[AssetWatcher(name="airflow_releases_watcher", trigger=trigger)]
)
@dag(start_date=datetime.datetime(2024, 10, 1), schedule=asset, catchup=False)
def check_airflow_releases():
@task()
def print_airflow_release_info():
release_name = Variable.get("release_name_var")
release_html_url = Variable.get("release_html_url_var")
print(f"{release_name} has been released. Check it out at {release_html_url}")
print_airflow_release_info()
check_airflow_releases()
Parameters¶
http_conn_idhttp connection id that has the base API url i.e https://www.google.com/ and optional authentication credentials. Default headers can also be specified in the Extra field in json format.
auth_typeThe auth type for the service
methodthe API method to be called
endpointEndpoint to be called, i.e.
resource/v1/query?headersAdditional headers to be passed through as a dict
dataPayload to be uploaded or request parameters
extra_optionsAdditional kwargs to pass when creating a request.
response_check_pathPath to callable that evaluates whether the API response passes the conditions set by the user to trigger DAGs
Important Notes¶
A
response_check_pathvalue is required.The
response_check_pathmust contain the path to an asynchronous callable. Synchronous callables will raise an exception.This trigger does not automatically record the previous API response.
The previous response may have to be persisted manually though
Variable.set()in theresponse_check_pathcallable to prevent the trigger from emitting events repeatedly for the same API response.