DdlOperator¶
The DdlOperator is an Airflow operator designed to execute Data Definition Language (DDL) statements on Teradata databases. It provides a robust way to create, alter, or drop database objects as part of your data pipelines.
Note
The DdlOperator requires the Teradata Parallel Transporter (TPT) package from Teradata Tools and Utilities (TTU)
to be installed on the machine where the tbuild command will run (either local or remote).
Ensure that the tbuild executable is available in the system’s PATH.
Refer to the official Teradata documentation for installation, configuration, and security best practices.
Key Features:
Executes DDL SQL statements (CREATE, ALTER, DROP, etc.)
Works with single statements or batches of multiple DDL operations
Integrates with Airflow’s connection management for secure database access
Provides comprehensive logging of execution results
Supports both local and remote execution via SSH
When you need to manage database schema changes, create temporary tables, or clean up data structures as part of your workflow, the DdlOperator offers a streamlined approach that integrates seamlessly with your Airflow DAGs.
Prerequisite¶
Make sure your Teradata Airflow connection is defined with the required fields:
hostloginpassword
You can define a remote host with a separate SSH connection using the ssh_conn_id.
Ensure that the Teradata Parallel Transporter (TPT) package is installed on the machine where TdLoadOperator will execute commands. This can be:
The local machine where Airflow runs the task, for local execution.
A remote host accessed via SSH, for remote execution.
If executing remotely, ensure that an SSH server (e.g., sshd) is running and accessible on the remote machine, and that the tbuild executable is available in the system’s PATH.
Note
For improved security, it is highly recommended to use private key-based SSH authentication (SSH key pairs) instead of username/password for the SSH connection.
This avoids password exposure, enables seamless automated execution, and enhances security.
See the Airflow SSH Connection documentation for details on configuring SSH keys: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/ssh.html
To execute DDL operations in a Teradata database, use the
DdlOperator.
Handling Escape Sequences for Embedded Quotes¶
When working with DDL statements that contain embedded quotes, it’s important to understand how escape sequences are handled differently between the DAG definition and the SQL execution:
In DAG Definition (Python):
- Use backslash escape sequences: \" for double quotes, \' for single quotes
- Python string literals require backslash escaping
In SQL Execution (Teradata):
- SQL standard requires doubling quotes when enclosed within the same quote type
- Single quotes in single-quoted strings: 'Don''t'
- Double quotes in double-quoted identifiers: "My""Table"
Example:
# In your DAG - use Python escape sequences
ddl_with_quotes = DdlOperator(
task_id="create_table_with_quotes",
ddl=[
"CREATE TABLE test_table (col1 VARCHAR(50) DEFAULT '\"quoted_value\"')",
"INSERT INTO test_table VALUES ('It''s a test')", # Note the doubled single quotes
],
teradata_conn_id="teradata_default",
)
Key Points: - When defining DDL statements in Python strings, use standard Python escape sequences - The operator automatically handles the conversion for TPT script generation - For SQL string literals containing quotes, follow SQL standards (double the quotes) - Test your DDL statements carefully when they contain complex quoting
Key Operation Examples with DdlOperator¶
Dropping tables in Teradata¶
You can use the DdlOperator to drop tables in Teradata. The following example demonstrates how to drop multiple tables:
# Drop tables if they exist
drop_table = DdlOperator(
task_id="drop_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }};",
"DROP TABLE {{ params.SOURCE_TABLE }}_UV;",
"DROP TABLE {{ params.SOURCE_TABLE }}_ET;",
"DROP TABLE {{ params.SOURCE_TABLE }}_WT;",
"DROP TABLE {{ params.SOURCE_TABLE }}_Log;",
"DROP TABLE {{ params.TARGET_TABLE }};",
"DROP TABLE {{ params.TARGET_TABLE }}_UV;",
"DROP TABLE {{ params.TARGET_TABLE }}_ET;",
"DROP TABLE {{ params.TARGET_TABLE }}_WT;",
"DROP TABLE {{ params.TARGET_TABLE }}_Log;",
],
error_list=[3706, 3803, 3807],
)
Creating tables in Teradata¶
You can use the DdlOperator to create tables in Teradata. The following example demonstrates how to create multiple tables:
create_source_table = DdlOperator(
task_id="create_source_table",
ddl=[
"CREATE TABLE {{ params.SOURCE_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
create_target_table = DdlOperator(
task_id="create_target_table",
ddl=[
"CREATE TABLE {{ params.TARGET_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
Creating an index on a Teradata table¶
You can use the DdlOperator to create an index on a Teradata table. The following example demonstrates how to create an index:
create_index_on_source = DdlOperator(
task_id="create_index_on_source",
ddl=["CREATE INDEX idx_employee_id (employee_id) ON {{ params.SOURCE_TABLE }};"],
)
Renaming a table in Teradata¶
You can use the DdlOperator to rename a table in Teradata. The following example demonstrates how to rename a table:
rename_target_table = DdlOperator(
task_id="rename_target_table",
ddl=[
"RENAME TABLE {{ params.TARGET_TABLE }} TO {{ params.TARGET_TABLE }}_renamed;",
"DROP TABLE {{ params.TARGET_TABLE }}_renamed",
],
)
Dropping an index in Teradata¶
You can use the DdlOperator to drop an index in Teradata. The following example demonstrates how to drop an index:
drop_index_on_source = DdlOperator(
task_id="drop_index_on_source",
ddl=["DROP INDEX idx_employee_id ON {{ params.SOURCE_TABLE }};"],
error_list=[3706, 3803, 3807],
)
Altering a table in Teradata¶
You can use the DdlOperator to alter a table in Teradata. The following example demonstrates how to add a column:
alter_source_table = DdlOperator(
task_id="alter_source_table",
ddl=["ALTER TABLE {{ params.SOURCE_TABLE }} ADD hire_date DATE;"],
)
The complete Teradata Operator DAG¶
When we put everything together, our DAG should look like this:
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_tpt"
CONN_ID = "teradata_default"
SSH_CONN_ID = "ssh_default"
# Define file paths and table names for the test
SYSTEM_TESTS_DIR = os.path.abspath(os.path.dirname(__file__))
params = {
"SOURCE_TABLE": "source_table",
"TARGET_TABLE": "target_table",
}
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID, "params": params},
) as dag:
# Drop tables if they exist
drop_table = DdlOperator(
task_id="drop_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }};",
"DROP TABLE {{ params.SOURCE_TABLE }}_UV;",
"DROP TABLE {{ params.SOURCE_TABLE }}_ET;",
"DROP TABLE {{ params.SOURCE_TABLE }}_WT;",
"DROP TABLE {{ params.SOURCE_TABLE }}_Log;",
"DROP TABLE {{ params.TARGET_TABLE }};",
"DROP TABLE {{ params.TARGET_TABLE }}_UV;",
"DROP TABLE {{ params.TARGET_TABLE }}_ET;",
"DROP TABLE {{ params.TARGET_TABLE }}_WT;",
"DROP TABLE {{ params.TARGET_TABLE }}_Log;",
],
error_list=[3706, 3803, 3807],
)
create_source_table = DdlOperator(
task_id="create_source_table",
ddl=[
"CREATE TABLE {{ params.SOURCE_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
create_target_table = DdlOperator(
task_id="create_target_table",
ddl=[
"CREATE TABLE {{ params.TARGET_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
create_index_on_source = DdlOperator(
task_id="create_index_on_source",
ddl=["CREATE INDEX idx_employee_id (employee_id) ON {{ params.SOURCE_TABLE }};"],
)
rename_target_table = DdlOperator(
task_id="rename_target_table",
ddl=[
"RENAME TABLE {{ params.TARGET_TABLE }} TO {{ params.TARGET_TABLE }}_renamed;",
"DROP TABLE {{ params.TARGET_TABLE }}_renamed",
],
)
drop_index_on_source = DdlOperator(
task_id="drop_index_on_source",
ddl=["DROP INDEX idx_employee_id ON {{ params.SOURCE_TABLE }};"],
error_list=[3706, 3803, 3807],
)
alter_source_table = DdlOperator(
task_id="alter_source_table",
ddl=["ALTER TABLE {{ params.SOURCE_TABLE }} ADD hire_date DATE;"],
)
# Define the task dependencies
(
drop_table
>> create_source_table
>> create_target_table
>> create_index_on_source
>> rename_target_table
>> drop_index_on_source
>> alter_source_table
)
from tests_common.test_utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)