Airflow Summit 2025 is coming October 07-09. Register now to secure your spot!

Apache Kafka Message Queue

Apache Kafka Queue Provider

Implemented by KafkaMessageQueueProvider

The Apache Kafka Queue Provider is a BaseMessageQueueProvider that uses Apache Kafka as the underlying message queue system. It allows you to send and receive messages using Kafka topics in your Airflow workflows with MessageQueueTrigger common message queue interface.

  • It uses kafka as scheme for identifying Kafka queues.

  • For parameter definitions take a look at AwaitMessageTrigger.

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import Asset, AssetWatcher

trigger = MessageQueueTrigger(
    scheme="kafka",
    # Additional Kafka AwaitMessageTrigger parameters as needed
    topics=["my_topic"],
    apply_function="module.apply_function",
    bootstrap_servers="localhost:9092",
)

asset = Asset("kafka_queue_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)])

For a complete example, see: tests.system.common.messaging.kafka_message_queue_trigger

Apache Kafka Message Queue Trigger

Implemented by KafkaMessageQueueTrigger

Inherited from MessageQueueTrigger

Wait for a message in a queue

Below is an example of how you can configure an Airflow Dag to be triggered by a message in Apache Kafka.

tests/system/apache/kafka/example_dag_kafka_message_queue_trigger.py[source]

from airflow.providers.apache.kafka.triggers.msg_queue import KafkaMessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher


def apply_function(message):
    val = json.loads(message.value())
    print(f"Value in message is {val}")
    return True


# Define a trigger that listens to an Apache Kafka message queue
trigger = KafkaMessageQueueTrigger(
    topics=["test"],
    apply_function="example_dag_kafka_message_queue_trigger.apply_function",
    kafka_config_id="kafka_default",
    apply_function_args=None,
    apply_function_kwargs=None,
    poll_timeout=1,
    poll_interval=5,
)

# Define an asset that watches for messages on the queue
asset = Asset("kafka_queue_asset_1", watchers=[AssetWatcher(name="kafka_watcher_1", trigger=trigger)])

with DAG(dag_id="example_kafka_watcher_1", schedule=[asset]) as dag:
    EmptyOperator(task_id="task")

How it works

  1. Kafka Message Queue Trigger: The KafkaMessageQueueTrigger listens for messages from Apache Kafka Topic(s).

2. Asset and Watcher: The Asset abstracts the external entity, the Kafka queue in this example. The AssetWatcher associate a trigger with a name. This name helps you identify which trigger is associated to which asset.

3. Event-Driven Dag: Instead of running on a fixed schedule, the Dag executes when the asset receives an update (e.g., a new message in the queue).

For how to use the trigger, refer to the documentation of the Messaging Trigger

Was this entry helpful?