BteqOperator¶
The BteqOperator
enables execution of SQL statements or BTEQ (Basic Teradata Query) scripts using the Teradata BTEQ utility, which can be installed either locally or accessed remotely via SSH.
This is useful for executing administrative operations, batch queries, or ETL tasks in Teradata environments using the Teradata BTEQ utility.
Note
This operator requires the Teradata Tools and Utilities (TTU) including the bteq
binary to be installed
and accessible via the system’s PATH
(either locally or on the remote SSH host).
Use the BteqOperator
when you want to:
Run parameterized or templated SQL/BTEQ scripts
Connect securely to Teradata with Airflow connections
Execute queries via SSH on remote systems with BTEQ installed
Prerequisite¶
Make sure your Teradata Airflow connection is defined with the required fields:
host
login
password
Optional:
database
, etc.
You can define a remote host with a separate SSH connection using the ssh_conn_id
.
Note
For improved security, it is highly recommended to use private key-based SSH authentication (SSH key pairs) instead of username/password for the SSH connection.
This avoids password exposure, enables seamless automated execution, and enhances security.
See the Airflow SSH Connection documentation for details on configuring SSH keys: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/ssh.html
To execute arbitrary SQL or BTEQ commands in a Teradata database, use the
BteqOperator
.
Common Database Operations with BteqOperator when BTEQ is installed on local machine¶
Creating a Teradata database table¶
You can use the BteqOperator to create tables in a Teradata database. The following example demonstrates how to create a simple employee table:
create_table = BteqOperator(
task_id="create_table",
sql=r"""
CREATE SET TABLE {{params.DB_TABLE_NAME}} (
emp_id INT,
emp_name VARCHAR(100),
dept VARCHAR(50)
) PRIMARY INDEX (emp_id);
""",
bteq_quit_rc=[0, 4],
timeout=20,
bteq_session_encoding="UTF8",
bteq_script_encoding="UTF8",
params=params,
)
The BTEQ script within this operator handles the table creation, including defining columns, data types, and constraints.
Inserting data into a Teradata database table¶
The following example demonstrates how to populate the my_employees
table with sample employee records:
populate_table = BteqOperator(
task_id="populate_table",
sql=r"""
INSERT INTO {{params.DB_TABLE_NAME}} VALUES (1, 'John Doe', 'IT');
INSERT INTO {{params.DB_TABLE_NAME}} VALUES (2, 'Jane Smith', 'HR');
""",
params=params,
bteq_session_encoding="UTF8",
bteq_quit_rc=0,
)
This BTEQ script inserts multiple rows into the table in a single operation, making it efficient for batch data loading.
Exporting data from a Teradata database table to a file¶
The BteqOperator makes it straightforward to export query results to a file. This capability is valuable for data extraction, backups, and transferring data between systems. The following example demonstrates how to query the employee table and export the results:
export_to_a_file = BteqOperator(
task_id="export_to_a_file",
sql=r"""
.EXPORT FILE = employees_output.txt;
SELECT * FROM {{params.DB_TABLE_NAME}};
.EXPORT RESET;
""",
bteq_session_encoding="UTF16",
)
The BTEQ script above handles the data export with options for formatting, file location specification, and error handling during the export process.
Fetching and processing records from your Teradata database¶
You can use BteqOperator to query and retrieve data from your Teradata tables. The following example demonstrates how to fetch specific records from the employee table with filtering and formatting:
get_it_employees = BteqOperator(
task_id="get_it_employees",
sql=r"""
SELECT * FROM {{params.DB_TABLE_NAME}} WHERE dept = 'IT';
""",
bteq_session_encoding="ASCII",
)
Executing a BTEQ script with the BteqOperator¶
You can use BteqOperator to execute a BTEQ script directly. This is useful for running complex queries or scripts that require multiple SQL statements or specific BTEQ commands.
execute_bteq_file = BteqOperator(
task_id="execute_bteq_file",
file_path="providers/teradata/tests/system/teradata/script.bteq",
params=params,
)
Common Database Operations with BteqOperator when BTEQ is installed on remote machine¶
Make sure SSH connection is defined with the required fields to connect to remote machine:
remote_host
username
password
Optional:
key_file
,private_key
,conn_timeout
, etc.
Creating a Teradata database table¶
You can use the BteqOperator to create tables in a Teradata database. The following example demonstrates how to create a simple employee table:
create_table = BteqOperator(
task_id="create_table",
sql=r"""
CREATE SET TABLE {{params.DB_TABLE_NAME}} (
emp_id INT,
emp_name VARCHAR(100),
dept VARCHAR(50)
) PRIMARY INDEX (emp_id);
""",
bteq_quit_rc=[0, 4],
timeout=20,
bteq_session_encoding="UTF8",
bteq_script_encoding="UTF8",
params=params,
)
The BTEQ script within this operator handles the table creation, including defining columns, data types, and constraints.
Inserting data into a Teradata database table¶
The following example demonstrates how to populate the my_employees
table with sample employee records:
populate_table = BteqOperator(
task_id="populate_table",
sql=r"""
INSERT INTO {{params.DB_TABLE_NAME}} VALUES (1, 'John Doe', 'IT');
INSERT INTO {{params.DB_TABLE_NAME}} VALUES (2, 'Jane Smith', 'HR');
""",
params=params,
bteq_session_encoding="UTF8",
bteq_quit_rc=0,
)
This BTEQ script inserts multiple rows into the table in a single operation, making it efficient for batch data loading.
Exporting data from a Teradata database table to a file¶
The BteqOperator makes it straightforward to export query results to a file. This capability is valuable for data extraction, backups, and transferring data between systems. The following example demonstrates how to query the employee table and export the results:
export_to_a_file = BteqOperator(
task_id="export_to_a_file",
sql=r"""
.EXPORT FILE = employees_output.txt;
SELECT * FROM {{params.DB_TABLE_NAME}};
.EXPORT RESET;
""",
bteq_session_encoding="UTF16",
)
The BTEQ script above handles the data export with options for formatting, file location specification, and error handling during the export process.
Fetching and processing records from your Teradata database¶
You can use BteqOperator to query and retrieve data from your Teradata tables. The following example demonstrates how to fetch specific records from the employee table with filtering and formatting:
get_it_employees = BteqOperator(
task_id="get_it_employees",
sql=r"""
SELECT * FROM {{params.DB_TABLE_NAME}} WHERE dept = 'IT';
""",
bteq_session_encoding="ASCII",
)
This example shows how to: - Execute a SELECT query with WHERE clause filtering - Format the output for better readability - Process the result set within the BTEQ script - Handle empty result sets appropriately
Executing a BTEQ script with the BteqOperator when BTEQ script file is on remote machine¶
You can use BteqOperator to execute a BTEQ script directly when file is on remote machine.
execute_bteq_file = BteqOperator(
task_id="execute_bteq_file",
file_path="/home/devtools/satish/airflow/script.bteq",
params=params,
)
Using Conditional Logic with BteqOperator¶
The BteqOperator supports executing conditional logic within your BTEQ scripts. This powerful feature lets you create dynamic, decision-based workflows that respond to data conditions or processing results:
cond_logic = BteqOperator(
task_id="cond_logic",
sql=r"""
.IF ERRORCODE <> 0 THEN .GOTO handle_error;
SELECT COUNT(*) FROM {{params.DB_TABLE_NAME}};
.LABEL handle_error;
""",
bteq_script_encoding="UTF8",
)
Conditional execution enables more intelligent data pipelines that can adapt to different scenarios without requiring separate DAG branches.
Error Handling in BTEQ Scripts¶
The BteqOperator allows you to implement comprehensive error handling within your BTEQ scripts:
error_handling = BteqOperator(
task_id="error_handling",
sql=r"""
DROP TABLE my_temp;
.IF ERRORCODE = 3807 THEN .GOTO table_not_found;
SELECT 'Table dropped successfully.';
.GOTO end;
.LABEL table_not_found;
SELECT 'Table not found - continuing execution';
.LABEL end;
.LOGOFF;
.QUIT 0;
""",
bteq_script_encoding="UTF16",
)
This approach lets you catch and respond to errors at the BTEQ script level, providing more granular control over error conditions and enabling appropriate recovery actions.
Dropping a Teradata Database Table¶
When your workflow completes or requires cleanup, you can use the BteqOperator to drop database objects. The following example demonstrates how to drop the my_employees
table:
drop_table = BteqOperator(
task_id="drop_table",
sql=r"""
DROP TABLE {{params.DB_TABLE_NAME}};
.IF ERRORCODE = 3807 THEN .GOTO end;
.LABEL end;
.LOGOFF;
.QUIT 0;
""",
bteq_script_encoding="ASCII",
)
The complete Teradata Operator DAG¶
When we put everything together, our DAG should look like this:
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_bteq"
CONN_ID = "teradata_default"
SSH_CONN_ID = "ssh_default"
host = os.environ.get("host", "localhost")
username = os.environ.get("username", "temp")
password = os.environ.get("password", "temp")
params = {
"host": host,
"username": username,
"password": password,
"DATABASE_NAME": "airflow",
"TABLE_NAME": "my_employees",
"DB_TABLE_NAME": "airflow.my_employees",
}
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID, "params": params},
) as dag:
create_table = BteqOperator(
task_id="create_table",
sql=r"""
CREATE SET TABLE {{params.DB_TABLE_NAME}} (
emp_id INT,
emp_name VARCHAR(100),
dept VARCHAR(50)
) PRIMARY INDEX (emp_id);
""",
bteq_quit_rc=[0, 4],
timeout=20,
bteq_session_encoding="UTF8",
bteq_script_encoding="UTF8",
params=params,
)
populate_table = BteqOperator(
task_id="populate_table",
sql=r"""
INSERT INTO {{params.DB_TABLE_NAME}} VALUES (1, 'John Doe', 'IT');
INSERT INTO {{params.DB_TABLE_NAME}} VALUES (2, 'Jane Smith', 'HR');
""",
params=params,
bteq_session_encoding="UTF8",
bteq_quit_rc=0,
)
export_to_a_file = BteqOperator(
task_id="export_to_a_file",
sql=r"""
.EXPORT FILE = employees_output.txt;
SELECT * FROM {{params.DB_TABLE_NAME}};
.EXPORT RESET;
""",
bteq_session_encoding="UTF16",
)
get_it_employees = BteqOperator(
task_id="get_it_employees",
sql=r"""
SELECT * FROM {{params.DB_TABLE_NAME}} WHERE dept = 'IT';
""",
bteq_session_encoding="ASCII",
)
cond_logic = BteqOperator(
task_id="cond_logic",
sql=r"""
.IF ERRORCODE <> 0 THEN .GOTO handle_error;
SELECT COUNT(*) FROM {{params.DB_TABLE_NAME}};
.LABEL handle_error;
""",
bteq_script_encoding="UTF8",
)
error_handling = BteqOperator(
task_id="error_handling",
sql=r"""
DROP TABLE my_temp;
.IF ERRORCODE = 3807 THEN .GOTO table_not_found;
SELECT 'Table dropped successfully.';
.GOTO end;
.LABEL table_not_found;
SELECT 'Table not found - continuing execution';
.LABEL end;
.LOGOFF;
.QUIT 0;
""",
bteq_script_encoding="UTF16",
)
drop_table = BteqOperator(
task_id="drop_table",
sql=r"""
DROP TABLE {{params.DB_TABLE_NAME}};
.IF ERRORCODE = 3807 THEN .GOTO end;
.LABEL end;
.LOGOFF;
.QUIT 0;
""",
bteq_script_encoding="ASCII",
)
execute_bteq_file = BteqOperator(
task_id="execute_bteq_file",
file_path="providers/teradata/tests/system/teradata/script.bteq",
params=params,
)
# [START bteq_operator_howto_guide_bteq_file_utf8_input]
execute_bteq_utf8_file = BteqOperator(
task_id="execute_bteq_utf8_file",
file_path="providers/teradata/tests/system/teradata/script.bteq",
params=params,
bteq_script_encoding="UTF8",
)
# [END bteq_operator_howto_guide_bteq_file_utf8_input]
# [START bteq_operator_howto_guide_bteq_file_utf8_session_ascii_input]
execute_bteq_utf8_session_ascii_file = BteqOperator(
task_id="execute_bteq_utf8_session_ascii_file",
file_path="providers/teradata/tests/system/teradata/script.bteq",
params=params,
bteq_script_encoding="UTF8",
bteq_session_encoding="ASCII",
)
# [END bteq_operator_howto_guide_bteq_file_utf8_session_ascii_input]
# [START bteq_operator_howto_guide_bteq_file_utf8_session_utf8_input]
execute_bteq_utf8_session_utf8_file = BteqOperator(
task_id="execute_bteq_utf8_session_utf8_file",
file_path="providers/teradata/tests/system/teradata/script.bteq",
params=params,
bteq_script_encoding="UTF8",
bteq_session_encoding="UTF8",
)
# [END bteq_operator_howto_guide_bteq_file_utf8_session_utf8_input]
# [START bteq_operator_howto_guide_bteq_file_utf8_session_utf16_input]
execute_bteq_utf8_session_utf16_file = BteqOperator(
task_id="execute_bteq_utf8_session_utf16_file",
file_path="providers/teradata/tests/system/teradata/script.bteq",
params=params,
bteq_script_encoding="UTF8",
bteq_session_encoding="UTF16",
)
# [END bteq_operator_howto_guide_bteq_file_utf8_session_utf16_input]
# [START bteq_operator_howto_guide_bteq_file_utf16_input]
execute_bteq_utf16_file = BteqOperator(
task_id="execute_bteq_utf16_file",
file_path="providers/teradata/tests/system/teradata/script_utf16.bteq",
params=params,
bteq_script_encoding="UTF16",
)
# [END bteq_operator_howto_guide_bteq_file_utf16_input]
# [START bteq_operator_howto_guide_bteq_file_utf16_input]
execute_bteq_utf16_session_ascii_file = BteqOperator(
task_id="execute_bteq_utf16_session_ascii_file",
file_path="providers/teradata/tests/system/teradata/script_utf16.bteq",
params=params,
bteq_script_encoding="UTF16",
bteq_session_encoding="ASCII",
)
# [END bteq_operator_howto_guide_bteq_file_utf16_input]
# [START bteq_operator_howto_guide_bteq_file_utf16_session_utf8_input]
execute_bteq_utf16_session_utf8_file = BteqOperator(
task_id="execute_bteq_utf16_session_utf8_file",
file_path="providers/teradata/tests/system/teradata/script_utf16.bteq",
params=params,
bteq_script_encoding="UTF16",
bteq_session_encoding="UTF8",
)
# [END bteq_operator_howto_guide_bteq_file_utf16_session_utf8_input]
# [START bteq_operator_howto_guide_bteq_file_utf16_session_utf8_input]
execute_bteq_utf16_session_utf16_file = BteqOperator(
task_id="execute_bteq_utf16_session_utf16_file",
file_path="providers/teradata/tests/system/teradata/script_utf16.bteq",
params=params,
bteq_script_encoding="UTF16",
bteq_session_encoding="UTF16",
)
# [END bteq_operator_howto_guide_bteq_file_utf16_session_utf8_input]
(
create_table
>> populate_table
>> export_to_a_file
>> get_it_employees
>> cond_logic
>> error_handling
>> drop_table
>> execute_bteq_file
>> execute_bteq_utf8_file
>> execute_bteq_utf8_session_ascii_file
>> execute_bteq_utf8_session_utf8_file
>> execute_bteq_utf8_session_utf16_file
>> execute_bteq_utf16_file
>> execute_bteq_utf16_session_ascii_file
>> execute_bteq_utf16_session_utf8_file
>> execute_bteq_utf16_session_utf16_file
)