Source code for airflow.providers.standard.triggers.file

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import asyncio
import datetime
import logging
import os
from collections.abc import AsyncIterator, Hashable
from glob import glob
from typing import Any

import anyio

from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
    from airflow.triggers.base import BaseEventTrigger, BaseTrigger, TriggerEvent
else:
    from airflow.triggers.base import (  # type: ignore
        BaseTrigger,
        BaseTrigger as BaseEventTrigger,
        TriggerEvent,
    )

[docs] log = logging.getLogger(__name__)
[docs] class FileTrigger(BaseTrigger): """ A trigger that fires exactly once after it finds the requested file or folder. :param filepath: File or folder name (relative to the base path set within the connection), can be a glob. :param recursive: when set to ``True``, enables recursive directory matching behavior of ``**`` in glob filepath parameter. Defaults to ``False``. :param poke_interval: Time that the job should wait in between each try """ def __init__( self, filepath: str, recursive: bool = False, poke_interval: float = 5.0, **kwargs, ): super().__init__()
[docs] self.filepath = filepath
[docs] self.recursive = recursive
[docs] self.poke_interval = poke_interval
[docs] def serialize(self) -> tuple[str, dict[str, Any]]: """Serialize FileTrigger arguments and classpath.""" return ( "airflow.providers.standard.triggers.file.FileTrigger", { "filepath": self.filepath, "recursive": self.recursive, "poke_interval": self.poke_interval, }, )
[docs] async def run(self) -> AsyncIterator[TriggerEvent]: """Loop until the relevant files are found.""" while True: for path in glob(self.filepath, recursive=self.recursive): if await anyio.Path(path).is_file(): mod_time_f = (await anyio.Path(path).stat()).st_mtime mod_time = datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S") self.log.info("Found File %s last modified: %s", path, mod_time) yield TriggerEvent(True) return for _, _, files in await anyio.to_thread.run_sync(lambda: list(os.walk(path))): if files: yield TriggerEvent(True) return await asyncio.sleep(self.poke_interval)
[docs] class FileDeleteTrigger(BaseEventTrigger): """ A trigger that fires exactly once after it finds the requested file and then delete the file. The difference between ``FileTrigger`` and ``FileDeleteTrigger`` is ``FileDeleteTrigger`` can only find a specific file. :param filepath: File (relative to the base path set within the connection). :param poke_interval: Time that the job should wait in between each try """ def __init__( self, filepath: str, poke_interval: float = 5.0, **kwargs, ): super().__init__()
[docs] self.filepath = filepath
[docs] self.poke_interval = poke_interval
[docs] def serialize(self) -> tuple[str, dict[str, Any]]: """Serialize FileDeleteTrigger arguments and classpath.""" return ( "airflow.providers.standard.triggers.file.FileDeleteTrigger", { "filepath": self.filepath, "poke_interval": self.poke_interval, }, )
[docs] async def run(self) -> AsyncIterator[TriggerEvent]: """Loop until the relevant file is found.""" while True: filepath = anyio.Path(self.filepath) if await filepath.is_file(): mod_time_f = (await filepath.stat()).st_mtime mod_time = datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S") self.log.info("Found file %s last modified: %s", self.filepath, mod_time) await filepath.unlink() self.log.info("File %s has been deleted", self.filepath) yield TriggerEvent(True) return await asyncio.sleep(self.poke_interval)
[docs] class DirectoryFileDeleteTrigger(BaseEventTrigger): """ Fire once when ``filename`` appears in ``directory``, then delete it. Functionally equivalent to ``FileDeleteTrigger`` for a single file, but sibling triggers that point at the same ``directory`` and ``poke_interval`` share a single underlying directory scan in the triggerer; each instance only fires for its own ``filename``. This is useful when many assets are driven by per-flag-file events landing in a shared inbox directory. :param directory: Directory to scan. :param filename: File name (without directory) whose appearance fires this trigger. The matched file is deleted before the event is yielded. :param poke_interval: Time to wait between scans. """ def __init__(self, *, directory: str, filename: str, poke_interval: float = 5.0) -> None: super().__init__()
[docs] self.directory = directory
[docs] self.filename = filename
[docs] self.poke_interval = poke_interval
[docs] def serialize(self) -> tuple[str, dict[str, Any]]: """Serialize DirectoryFileDeleteTrigger arguments and classpath.""" return ( "airflow.providers.standard.triggers.file.DirectoryFileDeleteTrigger", { "directory": self.directory, "filename": self.filename, "poke_interval": self.poke_interval, }, )
[docs] def shared_stream_key(self) -> Hashable | None: """All triggers on the same directory + cadence share one scan.""" # Use realpath so trivial path variants all resolve to the same canonical # path: trailing slashes (``/tmp/flags`` vs ``/tmp/flags/``), relative vs # absolute paths (``./flags`` vs ``/tmp/flags``), and symlinks vs their # targets all key to the same group instead of running N independent scans. return ("directory-scan", os.path.realpath(self.directory), self.poke_interval)
@classmethod
[docs] async def open_shared_stream(cls, kwargs: dict[str, Any]) -> AsyncIterator[Any]: """ Drive one directory-listing loop and broadcast each snapshot. Missing directories yield an empty snapshot so subscribers keep polling for the file to appear. Configuration-class failures (``PermissionError``, ``NotADirectoryError``, ``IsADirectoryError``) propagate — these are almost always permanent (wrong mount, wrong mode, path points at a file), so silently retrying just hides the misconfiguration from the operator; surfacing them as a ``_PollFailure`` makes the trigger visibly fail in the UI, where it can be diagnosed and restarted after the operator corrects the config. Other ``OSError`` subclasses (transient I/O blips, NFS hiccups, etc.) are logged at warning and the snapshot is skipped for this cadence, since those may self-heal. """ directory = anyio.Path(kwargs["directory"]) poke_interval: float = kwargs["poke_interval"] while True: try: names = {p.name async for p in directory.iterdir()} except FileNotFoundError: names = set() except (PermissionError, NotADirectoryError, IsADirectoryError): raise except OSError: log.warning( "Failed to list %s; retrying after %ss", directory, poke_interval, exc_info=True, ) await asyncio.sleep(poke_interval) continue yield {"directory": str(directory), "names": names} await asyncio.sleep(poke_interval)
[docs] async def filter_shared_stream(self, shared_stream: AsyncIterator[Any]) -> AsyncIterator[TriggerEvent]: """Fire once for this instance's own filename and delete the file.""" async for snapshot in shared_stream: if self.filename not in snapshot["names"]: continue filepath = anyio.Path(snapshot["directory"]) / self.filename try: await filepath.unlink() except FileNotFoundError: # Lost a race with a sibling, or the file disappeared between # snapshot and unlink. Wait for the next scan. continue self.log.info("File %s has been deleted", filepath) yield TriggerEvent({"filepath": str(filepath)}) return
[docs] async def run(self) -> AsyncIterator[TriggerEvent]: """ Standalone fallback when the shared-stream manager is unavailable. Mirrors the shared path so the trigger remains usable in unit tests and on Airflow versions without the manager wired in. It does not deduplicate I/O — that requires the triggerer to drive the shared stream. """ kwargs = self.serialize()[1] async for event in self.filter_shared_stream(type(self).open_shared_stream(kwargs)): yield event

Was this entry helpful?