Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

airflow.providers.apache.kafka.triggers.msg_queue

Classes

KafkaMessageQueueTrigger

A dedicated trigger for Kafka message queues that extends MessageQueueTrigger.

Module Contents

class airflow.providers.apache.kafka.triggers.msg_queue.KafkaMessageQueueTrigger(*, topics, kafka_config_id='kafka_default', apply_function, apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5, **kwargs)[source]

Bases: airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger

A dedicated trigger for Kafka message queues that extends MessageQueueTrigger.

This trigger extends the common MessageQueueTrigger and is designed to work with the KafkaMessageQueueProvider. It provides a more specific interface for Kafka message queue operations while leveraging the unified messaging framework.

Parameters:
  • topics (collections.abc.Sequence[str]) – The topic (or topic regex) that should be searched for messages

  • kafka_config_id (str) – The Kafka queue identifier in the format kafka://<broker>/<topic_list>

  • apply_function (str) – the location of the function to apply to messages for determination of matching criteria. (In python dot notation as a string)

  • apply_function_args (list[Any] | None) – A set of arguments to apply to the callable, defaults to None

  • apply_function_kwargs (dict[Any, Any] | None) – A set of key word arguments to apply to the callable, defaults to None, defaults to None

  • poll_timeout (float) – How long the Kafka client should wait before returning from a poll request to Kafka (seconds), defaults to 1

  • poll_interval (float) – How long the trigger should sleep after reaching the end of the Kafka log (seconds), defaults to 5

classmethod get_kafka_queue_uri(kafka_config_id, topics)[source]

Generate a Kafka queue URI string from a Kafka configuration ID and a list of topics.

Parameters:
  • kafka_config_id (str) – The Kafka connection configuration ID.

  • topics (collections.abc.Sequence[str]) – A sequence of topic names to include in the URI.

Returns:

A formatted Kafka URI string in the format “kafka://brokers/topics”.

Return type:

str

Was this entry helpful?