Connect to MSSQL using SQLExecuteQueryOperator

The purpose of this guide is to define tasks involving interactions with the MSSQL database using SQLExecuteQueryOperator.

Use the SQLExecuteQueryOperator to execute SQL commands in MSSQL database.

Note

Previously, MsSqlOperator was used to perform this kind of operation. Please use SQLExecuteQueryOperator instead.

Common Database Operations with SQLExecuteQueryOperator

To use the SQLExecuteQueryOperator to execute SQL queries against an MSSQL database, two parameters are required: sql and conn_id. These two parameters are eventually fed to the MSSQL hook object that interacts directly with the MSSQL database.

Creating a MSSQL database table

The code snippets below are based on Airflow-2.2

An example usage of the SQLExecuteQueryOperator to connect to MSSQL is as follows:

tests/system/microsoft/mssql/example_mssql.py[source]


    # Example of creating a task to create a table in MsSql

    create_table_mssql_task = SQLExecuteQueryOperator(
        task_id="create_country_table",
        conn_id="airflow_mssql",
        sql=r"""
        CREATE TABLE Country (
            country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
            name TEXT,
            continent TEXT
        );
        """,
        dag=dag,
    )

You can also use an external file to execute the SQL commands. Script folder must be at the same level as DAG.py file. This way you can easily maintain the SQL queries separated from the code.

tests/system/microsoft/mssql/example_mssql.py[source]

    # Example of creating a task that calls an sql command from an external file.
    create_table_mssql_from_external_file = SQLExecuteQueryOperator(
        task_id="create_table_from_external_file",
        conn_id="airflow_mssql",
        sql="create_table.sql",
        dag=dag,
    )

Your dags/create_table.sql should look like this:

Inserting data into a MSSQL database table

We can then create a SQLExecuteQueryOperator task that populate the Users table.

tests/system/microsoft/mssql/example_mssql.py[source]

    populate_user_table = SQLExecuteQueryOperator(
        task_id="populate_user_table",
        conn_id="airflow_mssql",
        sql=r"""
                INSERT INTO Users (username, description)
                VALUES ( 'Danny', 'Musician');
                INSERT INTO Users (username, description)
                VALUES ( 'Simone', 'Chef');
                INSERT INTO Users (username, description)
                VALUES ( 'Lily', 'Florist');
                INSERT INTO Users (username, description)
                VALUES ( 'Tim', 'Pet shop owner');
                """,
    )

Fetching records from your MSSQL database table

Fetching records from your MSSQL database table can be as simple as:

tests/system/microsoft/mssql/example_mssql.py[source]

    get_all_countries = SQLExecuteQueryOperator(
        task_id="get_all_countries",
        conn_id="airflow_mssql",
        sql=r"""SELECT * FROM Country;""",
    )

Passing Parameters into SQLExecuteQueryOperator

SQLExecuteQueryOperator provides parameters attribute which makes it possible to dynamically inject values into your SQL requests during runtime.

To find the countries in Asian continent:

tests/system/microsoft/mssql/example_mssql.py[source]

    get_countries_from_continent = SQLExecuteQueryOperator(
        task_id="get_countries_from_continent",
        conn_id="airflow_mssql",
        sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
        params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
    )

The complete SQLExecuteQueryOperator DAG to connect to MSSQL

When we put everything together, our DAG should look like this:

tests/system/microsoft/mssql/example_mssql.py[source]

import os
from datetime import datetime

import pytest

from airflow import DAG

try:
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
except ImportError:
    pytest.skip("MSSQL provider not available", allow_module_level=True)

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_mssql"


with DAG(
    DAG_ID,
    schedule="@daily",
    start_date=datetime(2021, 10, 1),
    tags=["example"],
    catchup=False,
) as dag:

    # Example of creating a task to create a table in MsSql

    create_table_mssql_task = SQLExecuteQueryOperator(
        task_id="create_country_table",
        conn_id="airflow_mssql",
        sql=r"""
        CREATE TABLE Country (
            country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
            name TEXT,
            continent TEXT
        );
        """,
        dag=dag,
    )

    @dag.task(task_id="insert_mssql_task")
    def insert_mssql_hook():
        mssql_hook = MsSqlHook(mssql_conn_id="airflow_mssql", schema="airflow")

        rows = [
            ("India", "Asia"),
            ("Germany", "Europe"),
            ("Argentina", "South America"),
            ("Ghana", "Africa"),
            ("Japan", "Asia"),
            ("Namibia", "Africa"),
        ]
        target_fields = ["name", "continent"]
        mssql_hook.insert_rows(table="Country", rows=rows, target_fields=target_fields)
    # Example of creating a task that calls an sql command from an external file.
    create_table_mssql_from_external_file = SQLExecuteQueryOperator(
        task_id="create_table_from_external_file",
        conn_id="airflow_mssql",
        sql="create_table.sql",
        dag=dag,
    )
    populate_user_table = SQLExecuteQueryOperator(
        task_id="populate_user_table",
        conn_id="airflow_mssql",
        sql=r"""
                INSERT INTO Users (username, description)
                VALUES ( 'Danny', 'Musician');
                INSERT INTO Users (username, description)
                VALUES ( 'Simone', 'Chef');
                INSERT INTO Users (username, description)
                VALUES ( 'Lily', 'Florist');
                INSERT INTO Users (username, description)
                VALUES ( 'Tim', 'Pet shop owner');
                """,
    )
    get_all_countries = SQLExecuteQueryOperator(
        task_id="get_all_countries",
        conn_id="airflow_mssql",
        sql=r"""SELECT * FROM Country;""",
    )
    get_all_description = SQLExecuteQueryOperator(
        task_id="get_all_description",
        conn_id="airflow_mssql",
        sql=r"""SELECT description FROM Users;""",
    )
    get_countries_from_continent = SQLExecuteQueryOperator(
        task_id="get_countries_from_continent",
        conn_id="airflow_mssql",
        sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
        params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
    )
    (
        create_table_mssql_task
        >> insert_mssql_hook()
        >> create_table_mssql_from_external_file
        >> populate_user_table
        >> get_all_countries
        >> get_all_description
        >> get_countries_from_continent
    )

Was this entry helpful?