airflow.providers.teradata.operators.tpt

Classes

DdlOperator

Operator to execute one or more DDL (Data Definition Language) statements on a Teradata Database.

TdLoadOperator

Operator to handle data transfers using Teradata Parallel Transporter (TPT) tdload utility.

Module Contents

class airflow.providers.teradata.operators.tpt.DdlOperator(*, ddl, error_list=None, teradata_conn_id=TeradataHook.default_conn_name, ssh_conn_id=None, remote_working_dir=None, ddl_job_name=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator to execute one or more DDL (Data Definition Language) statements on a Teradata Database.

This operator is designed to facilitate DDL operations such as creating, altering, or dropping tables, indexes, views, or other database objects in a scalable and efficient manner.

It leverages the TPT (Teradata Parallel Transporter) utility to perform the operations and supports templating for SQL statements, allowing dynamic generation of SQL at runtime.

Key Features: - Executes one or more DDL statements sequentially on Teradata using TPT - Supports error handling with customizable error code list - Supports XCom push to share execution results with downstream tasks - Integrates with Airflow’s templating engine for dynamic SQL generation - Can execute statements via SSH connection if needed

Parameters:
  • ddl (list[str]) – A list of DDL statements to be executed. Each item should be a valid SQL DDL command supported by Teradata.

  • error_list (int | list[int] | None) – Optional integer or list of error codes to ignore during execution. If provided, the operator will not fail when these specific error codes occur. Example: error_list=3803 or error_list=[3803, 3807]

  • teradata_conn_id (str) – The connection ID for the Teradata database. Defaults to TeradataHook.default_conn_name.

  • ssh_conn_id (str | None) – Optional SSH connection ID if the commands need to be executed through SSH.

  • remote_working_dir (str | None) – Directory on the remote server where temporary files will be stored.

  • ddl_job_name (str | None) – Optional name for the DDL job.

Raises:
  • ValueError – If the ddl parameter or error_list is invalid.

  • RuntimeError – If underlying TPT execution (tbuild) fails with non-zero exit status.

  • ConnectionError – If remote SSH connection cannot be established.

  • TimeoutError – If SSH connection attempt times out.

  • FileNotFoundError – If required TPT utility (tbuild) is missing locally or on remote host.

Example usage:

# Example of creating tables using DdlOperator
create_tables = DdlOperator(
    task_id="create_tables_task",
    ddl=[
        "CREATE TABLE my_database.my_table1 (id INT, name VARCHAR(100))",
        "CREATE TABLE my_database.my_table2 (id INT, value FLOAT)",
    ],
    teradata_conn_id="my_teradata_conn",
    error_list=[3803],  # Ignore "Table already exists" errors
    ddl_job_name="create_tables_job",
)

# Example of dropping tables using DdlOperator
drop_tables = DdlOperator(
    task_id="drop_tables_task",
    ddl=["DROP TABLE my_database.my_table1", "DROP TABLE my_database.my_table2"],
    teradata_conn_id="my_teradata_conn",
    error_list=3807,  # Ignore "Object does not exist" errors
    ddl_job_name="drop_tables_job",
)

# Example using templated SQL file
alter_table = DdlOperator(
    task_id="alter_table_task",
    ddl="{{ var.value.get('ddl_directory') }}/alter_table.sql",
    teradata_conn_id="my_teradata_conn",
    ssh_conn_id="my_ssh_conn",
    ddl_job_name="alter_table_job",
)
template_fields = ('ddl', 'ddl_job_name')[source]
template_ext = ('.sql',)[source]
ui_color = '#a8e4b1'[source]
ddl[source]
error_list = None[source]
teradata_conn_id = 'teradata_default'[source]
ssh_conn_id = None[source]
remote_working_dir = None[source]
ddl_job_name = None[source]
execute(context)[source]

Execute the DDL operations using the TptHook.

on_kill()[source]

Handle termination signals and ensure the hook is properly cleaned up.

class airflow.providers.teradata.operators.tpt.TdLoadOperator(*, teradata_conn_id=TeradataHook.default_conn_name, target_teradata_conn_id=None, ssh_conn_id=None, source_table=None, select_stmt=None, insert_stmt=None, target_table=None, source_file_name=None, target_file_name=None, source_format='Delimited', target_format='Delimited', source_text_delimiter=',', target_text_delimiter=',', tdload_options=None, tdload_job_name=None, tdload_job_var_file=None, remote_working_dir=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator to handle data transfers using Teradata Parallel Transporter (TPT) tdload utility.

This operator supports three main scenarios: 1. Load data from a file to a Teradata table 2. Export data from a Teradata table to a file 3. Transfer data between two Teradata tables (potentially across different databases)

For all scenarios:
param teradata_conn_id:

Connection ID for Teradata database (source for table operations)

For file to table loading:
param source_file_name:

Path to the source file (required for file to table)

param select_stmt:

SQL SELECT statement to filter data (optional)

param insert_stmt:

SQL INSERT statement to use for loading data (optional)

param target_table:

Name of the target table (required for file to table)

param target_teradata_conn_id:

Connection ID for target Teradata database (defaults to teradata_conn_id)

For table to file export:
param source_table:

Name of the source table (required for table to file)

param target_file_name:

Path to the target file (required for table to file)

For table to table transfer:
param source_table:

Name of the source table (required for table to table)

param select_stmt:

SQL SELECT statement to filter data (optional)

param insert_stmt:

SQL INSERT statement to use for loading data (optional)

param target_table:

Name of the target table (required for table to table)

param target_teradata_conn_id:

Connection ID for target Teradata database (required for table to table)

Optional configuration parameters:
param source_format:

Format of source data (default: ‘Delimited’)

param target_format:

Format of target data (default: ‘Delimited’)

param source_text_delimiter:

Source text delimiter (default: ‘,’)

param target_text_delimiter:

Target text delimiter (default: ‘,’)

param tdload_options:

Additional options for tdload (optional)

param tdload_job_name:

Name for the tdload job (optional)

param tdload_job_var_file:

Path to tdload job variable file (optional)

param ssh_conn_id:

SSH connection ID for secure file transfer (optional, used for file operations)

Raises:
  • ValueError – If parameter combinations are invalid or required files are missing.

  • RuntimeError – If underlying TPT execution (tdload) fails with non-zero exit status.

  • ConnectionError – If remote SSH connection cannot be established.

  • TimeoutError – If SSH connection attempt times out.

  • FileNotFoundError – If required TPT utility (tdload) is missing locally or on remote host.

Example usage:

# Example usage for file to table:
load_file = TdLoadOperator(
    task_id="load_from_file",
    source_file_name="/path/to/data.csv",
    target_table="my_database.my_table",
    target_teradata_conn_id="teradata_target_conn",
    insert_stmt="INSERT INTO my_database.my_table (col1, col2) VALUES (?, ?)",
)

# Example usage for table to file:
export_data = TdLoadOperator(
    task_id="export_to_file",
    source_table="my_database.my_table",
    target_file_name="/path/to/export.csv",
    teradata_conn_id="teradata_source_conn",
    ssh_conn_id="ssh_default",
    tdload_job_name="export_job",
)

# Example usage for table to table:
transfer_data = TdLoadOperator(
    task_id="transfer_between_tables",
    source_table="source_db.source_table",
    target_table="target_db.target_table",
    teradata_conn_id="teradata_source_conn",
    target_teradata_conn_id="teradata_target_conn",
    tdload_job_var_file="/path/to/vars.txt",
    insert_stmt="INSERT INTO target_db.target_table (col1, col2) VALUES (?, ?)",
)
template_fields = ('source_table', 'target_table', 'select_stmt', 'insert_stmt', 'source_file_name',...[source]
ui_color = '#a8e4b1'[source]
teradata_conn_id = 'teradata_default'[source]
target_teradata_conn_id = None[source]
ssh_conn_id = None[source]
source_table = None[source]
select_stmt = None[source]
insert_stmt = None[source]
target_table = None[source]
source_file_name = None[source]
target_file_name = None[source]
source_format = 'Delimited'[source]
source_text_delimiter = ','[source]
target_format = 'Delimited'[source]
target_text_delimiter = ','[source]
tdload_options = None[source]
tdload_job_name = None[source]
tdload_job_var_file = None[source]
remote_working_dir = None[source]
execute(context)[source]

Execute the TdLoad operation based on the configured parameters.

on_kill()[source]

Handle termination signals and ensure all hooks are properly cleaned up.

Was this entry helpful?