OdbcOperator

Open Database Connectivity (ODBC) is a standard API for accessing database management systems (DBMS).

Prerequisite Tasks

To use this operator you need:

  • Install the python module pyodbc: .. code-block:: bash

    pip install apache-airflow[odbc]

  • Have the ODBC driver for your database installed.

  • Configure an ODBC Data Source Name (DSN) if required by your database.

Once these prerequisites are satisfied you should be able to run this Python snippet (replacing the variables values with the ones related to your driver).

Other error messages will inform you in case the pyodbc module is missing or the driver is not available. A Connection Refused error means that the connection string is pointing to a host where no database is listening for new connections.

import pyodbc

driver = "{ODBC Driver 17 for SQL Server}"
server = "localhost"
database = "testdb"
username = "user"
password = "password"

conn_str = (
    f"DRIVER={driver};" f"SERVER={server};" f"DATABASE={database};" f"UID={username};" f"PWD={password};"
)

conn = pyodbc.connect(conn_str)

Usage

Use the SQLExecuteQueryOperator to execute commands against a database (or data storage) accessible via an ODBC driver.

The ODBC Connection must be passed as conn_id.

odbc/tests/system/odbc/example_odbc.py[source]


    create_table = SQLExecuteQueryOperator(
        task_id="create_table",
        sql="""
        CREATE TABLE IF NOT EXISTS my_table (
            dt VARCHAR(50),
            value VARCHAR(255)
        );
        """,
        conn_id="my_odbc_conn",
        autocommit=True,
    )

The parameter sql can receive a string or a list of strings. Each string can be an SQL statement or a reference to a template file. Template references are recognized by ending in ‘.sql’.

The parameter autocommit if set to True will execute a commit after each command (default is False).

Templating

You can use Jinja templates to parameterize sql.

odbc/tests/system/odbc/example_odbc.py[source]


    insert_data = SQLExecuteQueryOperator(
        task_id="insert_data",
        sql="""
        INSERT INTO my_table (dt, value)
        VALUES ('{{ ds }}', 'test_value');
        """,
        conn_id="my_odbc_conn",
        autocommit=True,
    )

Was this entry helpful?