LLMFileAnalysisOperator & @task.llm_file_analysis

Use LLMFileAnalysisOperator or the @task.llm_file_analysis decorator to analyze files from object storage or local storage with a single prompt.

The operator resolves file_path through ObjectStoragePath, reads supported formats in a read-only manner, injects file metadata and normalized content into the prompt, and optionally attaches images or PDFs as multimodal inputs.

Basic Usage

Analyze a text-like file or prefix with one prompt:

airflow/providers/common/ai/example_dags/example_llm_file_analysis.py[source]

@dag
def example_llm_file_analysis_basic():
    LLMFileAnalysisOperator(
        task_id="analyze_error_logs",
        prompt="Find error patterns and correlate them with deployment timestamps.",
        llm_conn_id="pydanticai_default",
        file_path="s3://logs/app/2024-01-15/",
        file_conn_id="aws_default",
    )


Directory / Prefix Analysis

Use a directory or object-storage prefix when you want the operator to analyze multiple files in one request. max_files bounds how many resolved files are included in the request, while the size and text limits keep the request safe:

airflow/providers/common/ai/example_dags/example_llm_file_analysis.py[source]

@dag
def example_llm_file_analysis_prefix():
    LLMFileAnalysisOperator(
        task_id="summarize_partitioned_logs",
        prompt=(
            "Summarize recurring errors across these partitioned log files and call out "
            "which partition keys appear in the highest-severity findings."
        ),
        llm_conn_id="pydanticai_default",
        file_path="s3://logs/app/dt=2024-01-15/",
        file_conn_id="aws_default",
        max_files=10,
        max_total_size_bytes=10 * 1024 * 1024,
        max_text_chars=20_000,
    )


Note

Prefix resolution enumerates objects under the supplied path and checks each candidate to find files before max_files is applied. For very large object-store prefixes, prefer a more specific path or a narrower prefix to avoid expensive listing and stat calls.

Multimodal Analysis

Set multi_modal=True for PNG/JPG/PDF inputs so they are sent as binary attachments to a vision-capable model:

airflow/providers/common/ai/example_dags/example_llm_file_analysis.py[source]

@dag
def example_llm_file_analysis_multimodal():
    LLMFileAnalysisOperator(
        task_id="validate_dashboards",
        prompt="Check charts for visual anomalies or stale data indicators.",
        llm_conn_id="pydanticai_default",
        file_path="s3://monitoring/dashboards/latest.png",
        file_conn_id="aws_default",
        multi_modal=True,
    )


Structured Output

Set output_type to a Pydantic BaseModel when you want a typed response back from the LLM instead of a plain string:

airflow/providers/common/ai/example_dags/example_llm_file_analysis.py[source]

@dag
def example_llm_file_analysis_structured():

    class FileAnalysisSummary(BaseModel):
        """Structured output schema for the file-analysis examples."""

        findings: list[str]
        highest_severity: str
        truncated_inputs: bool

    LLMFileAnalysisOperator(
        task_id="analyze_parquet_quality",
        prompt=(
            "Return the top data-quality findings from this Parquet dataset. "
            "Include whether any inputs were truncated."
        ),
        llm_conn_id="pydanticai_default",
        file_path="s3://analytics/warehouse/customers/",
        file_conn_id="aws_default",
        output_type=FileAnalysisSummary,
        sample_rows=5,
        max_files=5,
    )


TaskFlow Decorator

The @task.llm_file_analysis decorator wraps the operator. The function returns the prompt string; file settings are passed to the decorator:

airflow/providers/common/ai/example_dags/example_llm_file_analysis.py[source]

@dag
def example_llm_file_analysis_decorator():
    @task.llm_file_analysis(
        llm_conn_id="pydanticai_default",
        file_path="s3://analytics/reports/quarterly.pdf",
        file_conn_id="aws_default",
        multi_modal=True,
    )
    def review_quarterly_report():
        return "Extract the key revenue, risk, and compliance findings from this report."

    review_quarterly_report()


Parameters

  • prompt: The analysis request to send to the LLM (operator) or the return value of the decorated function (decorator).

  • llm_conn_id: Airflow connection ID for the LLM provider.

  • file_path: File or prefix to analyze.

  • file_conn_id: Optional connection ID for the storage backend. Overrides a connection embedded in file_path.

  • multi_modal: Allow PNG/JPG/PDF inputs as binary attachments. Default False.

  • max_files: Maximum number of files included from a prefix. Extra files are omitted and noted in the prompt. Default 20.

  • max_file_size_bytes: Maximum size of any single input file. Default 5 MiB.

  • max_total_size_bytes: Maximum cumulative size across all resolved files. Default 20 MiB.

  • max_text_chars: Maximum normalized text context sent to the LLM after sampling and truncation. Default 100000.

  • sample_rows: Maximum number of sampled rows or records included for CSV, Parquet, and Avro inputs. This controls structural preview depth, while max_file_size_bytes and max_total_size_bytes are byte-level read guards and max_text_chars is the final prompt-text budget. Default 10.

  • model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection’s extra field.

  • system_prompt: System-level instructions appended to the operator’s built-in read-only guidance.

  • output_type: Expected output type (default: str). Set to a Pydantic BaseModel for structured output.

  • agent_params: Additional keyword arguments passed to the pydantic-ai Agent constructor (e.g. retries, model_settings).

Supported Formats

  • Text-like: .log, .json, .csv, .parquet, .avro

  • Multimodal: .png, .jpg, .jpeg, .pdf when multi_modal=True

  • Gzip-compressed text inputs are supported for .log.gz, .json.gz, and .csv.gz.

  • Gzip is not supported for .parquet, .avro, image, or PDF inputs.

Parquet and Avro readers require their corresponding optional extras:

pip install apache-airflow-providers-common-ai[parquet]
pip install apache-airflow-providers-common-ai[avro]

Was this entry helpful?