Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Redis Message Queue

Redis Queue Provider

Implemented by RedisPubSubMessageQueueProvider

The Redis Queue Provider is a message queue provider that uses Redis as the underlying message queue system. It allows you to send and receive messages using Redis in your Airflow workflows. The provider supports Redis channels.

The queue must be matching this regex:

Queue URI Format:

redis://<host>:<port>/<channel_list>

Where:

  • host: Redis server hostname

  • port: Redis server port

  • channel_list: Comma-separated list of Redis channels to subscribe to

The queue parameter is used to configure the underlying AwaitMessageTrigger class and passes all kwargs directly to the trigger constructor, if provided.

Channels can also be specified via the Queue URI instead of the channels kwarg. The provider will extract channels from the URI as follows:

airflow/providers/redis/queues/redis.py[source]

        # Parse the queue URI
        parsed = urlparse(queue)
        # Extract channels (after host and port)
        # parsed.path starts with a '/', so strip it
        raw_channels = parsed.path.lstrip("/")
        channels = raw_channels.split(",") if raw_channels else []

Below is an example of how you can configure an Airflow DAG to be triggered by a message in Redis.

tests/system/redis/example_dag_message_queue_trigger.py[source]

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher

# Define a trigger that listens to an external message queue (Redis in this case)
trigger = MessageQueueTrigger(queue="redis+pubsub://localhost:6379/test")

# Define an asset that watches for messages on the queue
asset = Asset("redis_queue_asset_1", watchers=[AssetWatcher(name="redis_watcher_1", trigger=trigger)])

with DAG(dag_id="example_redis_watcher_1", schedule=[asset]) as dag:
    EmptyOperator(task_id="task_1")

Was this entry helpful?