Apache Kafka Message Queue¶
Apache Kafka Queue Provider¶
Implemented by KafkaMessageQueueProvider
The Apache Kafka Queue Provider is a message queue provider that uses Apache Kafka as the underlying message queue system. It allows you to send and receive messages using Kafka topics in your Airflow workflows. The provider supports Kafka topics and provides features for consuming and processing messages from Kafka brokers.
The queue must be matching this regex:
QUEUE_REGEXP = r"^kafka://"
Queue URI Format:
kafka://<broker>/<topic_list>
Where:
broker
: Kafka brokers (hostname:port)topic_list
: Comma-separated list of Kafka topics to consume messages from
The queue
parameter is used to configure the underlying
AwaitMessageTrigger
class and
passes all kwargs directly to the trigger constructor, if provided.
The apply_function
kwarg is required.
Topics can also be specified via the Queue URI instead of the topics
kwarg. The provider will extract topics from the URI as follows:
# Parse the queue URI
parsed = urlparse(queue)
# Extract topics (after host list)
# parsed.path starts with a '/', so strip it
raw_topics = parsed.path.lstrip("/")
topics = raw_topics.split(",") if raw_topics else []
Apache Kafka Message Queue Trigger¶
Implemented by KafkaMessageQueueTrigger
Inherited from MessageQueueTrigger
Wait for a message in a queue¶
Below is an example of how you can configure an Airflow DAG to be triggered by a message in Apache Kafka.
tests/system/apache/kafka/example_dag_kafka_message_queue_trigger.py
from airflow.providers.apache.kafka.triggers.msg_queue import KafkaMessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher
def apply_function(message):
val = json.loads(message.value())
print(f"Value in message is {val}")
return True
# Define a trigger that listens to an Apache Kafka message queue
trigger = KafkaMessageQueueTrigger(
topics=["test"],
apply_function="example_dag_kafka_message_queue_trigger.apply_function",
kafka_config_id="kafka_default",
apply_function_args=None,
apply_function_kwargs=None,
poll_timeout=1,
poll_interval=5,
)
# Define an asset that watches for messages on the queue
asset = Asset("kafka_queue_asset_1", watchers=[AssetWatcher(name="kafka_watcher_1", trigger=trigger)])
with DAG(dag_id="example_kafka_watcher_1", schedule=[asset]) as dag:
EmptyOperator(task_id="task")
How it works¶
Kafka Message Queue Trigger: The
KafkaMessageQueueTrigger
listens for messages from Apache Kafka Topic(s).
2. Asset and Watcher: The Asset
abstracts the external entity, the Kafka queue in this example.
The AssetWatcher
associate a trigger with a name. This name helps you identify which trigger is associated to which
asset.
3. Event-Driven DAG: Instead of running on a fixed schedule, the DAG executes when the asset receives an update (e.g., a new message in the queue).
For how to use the trigger, refer to the documentation of the Messaging Trigger