airflow.providers.apache.kafka.triggers.msg_queue¶
Classes¶
A dedicated trigger for Kafka message queues that extends |
Module Contents¶
- class airflow.providers.apache.kafka.triggers.msg_queue.KafkaMessageQueueTrigger(*, topics, kafka_config_id='kafka_default', apply_function, apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5, **kwargs)[source]¶
Bases:
airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger
A dedicated trigger for Kafka message queues that extends
MessageQueueTrigger
.This trigger extends the common
MessageQueueTrigger
and is designed to work with theKafkaMessageQueueProvider
. It provides a more specific interface for Kafka message queue operations while leveraging the unified messaging framework.- Parameters:
topics (collections.abc.Sequence[str]) – The topic (or topic regex) that should be searched for messages
kafka_config_id (str) – The Kafka queue identifier in the format kafka://<broker>/<topic_list>
apply_function (str) – the location of the function to apply to messages for determination of matching criteria. (In python dot notation as a string)
apply_function_args (list[Any] | None) – A set of arguments to apply to the callable, defaults to None
apply_function_kwargs (dict[Any, Any] | None) – A set of key word arguments to apply to the callable, defaults to None, defaults to None
poll_timeout (float) – How long the Kafka client should wait before returning from a poll request to Kafka (seconds), defaults to 1
poll_interval (float) – How long the trigger should sleep after reaching the end of the Kafka log (seconds), defaults to 5
- classmethod get_kafka_queue_uri(kafka_config_id, topics)[source]¶
Generate a Kafka queue URI string from a Kafka configuration ID and a list of topics.
- Parameters:
kafka_config_id (str) – The Kafka connection configuration ID.
topics (collections.abc.Sequence[str]) – A sequence of topic names to include in the URI.
- Returns:
A formatted Kafka URI string in the format “kafka://brokers/topics”.
- Return type: