DdlOperator¶
The DdlOperator is an Airflow operator designed to execute Data Definition Language (DDL) statements on Teradata databases. It provides a robust way to create, alter, or drop database objects as part of your data pipelines.
Note
The DdlOperator requires the Teradata Parallel Transporter (TPT) package from Teradata Tools and Utilities (TTU)
to be installed on the machine where the tbuild command will run (either local or remote).
Ensure that the tbuild executable is available in the system’s PATH.
Refer to the official Teradata documentation for installation, configuration, and security best practices.
Key Features:
Executes DDL SQL statements (CREATE, ALTER, DROP, etc.)
Works with single statements or batches of multiple DDL operations
Integrates with Airflow’s connection management for secure database access
Provides comprehensive logging of execution results
Supports both local and remote execution via SSH
When you need to manage database schema changes, create temporary tables, or clean up data structures as part of your workflow, the DdlOperator offers a streamlined approach that integrates seamlessly with your Airflow DAGs.
Prerequisite¶
Make sure your Teradata Airflow connection is defined with the required fields:
hostloginpassword
You can define a remote host with a separate SSH connection using the ssh_conn_id.
Ensure that the Teradata Parallel Transporter (TPT) package is installed on the machine where TdLoadOperator will execute commands. This can be:
The local machine where Airflow runs the task, for local execution.
A remote host accessed via SSH, for remote execution.
If executing remotely, ensure that an SSH server (e.g., sshd) is running and accessible on the remote machine, and that the tbuild executable is available in the system’s PATH.
Note
For improved security, it is highly recommended to use private key-based SSH authentication (SSH key pairs) instead of username/password for the SSH connection.
This avoids password exposure, enables seamless automated execution, and enhances security.
See the Airflow SSH Connection documentation for details on configuring SSH keys: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/ssh.html
To execute DDL operations in a Teradata database, use the
DdlOperator.
Handling Escape Sequences for Embedded Quotes¶
When working with DDL statements that contain embedded quotes, it’s important to understand how escape sequences are handled differently between the DAG definition and the SQL execution:
In DAG Definition (Python):
- Use backslash escape sequences: \" for double quotes, \' for single quotes
- Python string literals require backslash escaping
In SQL Execution (Teradata):
- SQL standard requires doubling quotes when enclosed within the same quote type
- Single quotes in single-quoted strings: 'Don''t'
- Double quotes in double-quoted identifiers: "My""Table"
Example:
# In your DAG - use Python escape sequences
ddl_with_quotes = DdlOperator(
task_id="create_table_with_quotes",
ddl=[
"CREATE TABLE test_table (col1 VARCHAR(50) DEFAULT '\"quoted_value\"')",
"INSERT INTO test_table VALUES ('It''s a test')", # Note the doubled single quotes
],
teradata_conn_id="teradata_default",
)
Key Points: - When defining DDL statements in Python strings, use standard Python escape sequences - The operator automatically handles the conversion for TPT script generation - For SQL string literals containing quotes, follow SQL standards (double the quotes) - Test your DDL statements carefully when they contain complex quoting
Key Operation Examples with DdlOperator¶
Dropping tables in Teradata¶
You can use the DdlOperator to drop tables in Teradata. The following example demonstrates how to drop multiple tables:
# Drop tables if they exist
drop_table = DdlOperator(
task_id="drop_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }};",
"DROP TABLE {{ params.SOURCE_TABLE }}_UV;",
"DROP TABLE {{ params.SOURCE_TABLE }}_ET;",
"DROP TABLE {{ params.SOURCE_TABLE }}_WT;",
"DROP TABLE {{ params.SOURCE_TABLE }}_Log;",
"DROP TABLE {{ params.TARGET_TABLE }};",
"DROP TABLE {{ params.TARGET_TABLE }}_UV;",
"DROP TABLE {{ params.TARGET_TABLE }}_ET;",
"DROP TABLE {{ params.TARGET_TABLE }}_WT;",
"DROP TABLE {{ params.TARGET_TABLE }}_Log;",
],
error_list=[3706, 3803, 3807],
)
Creating tables in Teradata¶
You can use the DdlOperator to create tables in Teradata. The following example demonstrates how to create multiple tables:
create_source_table = DdlOperator(
task_id="create_source_table",
ddl=[
"CREATE TABLE {{ params.SOURCE_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
create_target_table = DdlOperator(
task_id="create_target_table",
ddl=[
"CREATE TABLE {{ params.TARGET_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
Creating an index on a Teradata table¶
You can use the DdlOperator to create an index on a Teradata table. The following example demonstrates how to create an index:
create_index_on_source = DdlOperator(
task_id="create_index_on_source",
ddl=["CREATE INDEX idx_employee_id (employee_id) ON {{ params.SOURCE_TABLE }};"],
)
Renaming a table in Teradata¶
You can use the DdlOperator to rename a table in Teradata. The following example demonstrates how to rename a table:
rename_target_table = DdlOperator(
task_id="rename_target_table",
ddl=[
"RENAME TABLE {{ params.TARGET_TABLE }} TO {{ params.TARGET_TABLE }}_renamed;",
"DROP TABLE {{ params.TARGET_TABLE }}_renamed",
],
)
Dropping an index in Teradata¶
You can use the DdlOperator to drop an index in Teradata. The following example demonstrates how to drop an index:
drop_index_on_source = DdlOperator(
task_id="drop_index_on_source",
ddl=["DROP INDEX idx_employee_id ON {{ params.SOURCE_TABLE }};"],
error_list=[3706, 3803, 3807],
)
Altering a table in Teradata¶
You can use the DdlOperator to alter a table in Teradata. The following example demonstrates how to add a column:
alter_source_table = DdlOperator(
task_id="alter_source_table",
ddl=["ALTER TABLE {{ params.SOURCE_TABLE }} ADD hire_date DATE;"],
)
The complete Teradata Operator DAG¶
When we put everything together, our DAG should look like this:
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_tpt"
CONN_ID = "teradata_default"
SSH_CONN_ID = "ssh_default"
# Define file paths and table names for the test
SYSTEM_TESTS_DIR = os.path.abspath(os.path.dirname(__file__))
SOURCE_FILE = os.path.join(SYSTEM_TESTS_DIR, "tdload_src_file.txt")
TARGET_FILE = os.path.join(SYSTEM_TESTS_DIR, "tdload_target_file.txt")
params = {
"SOURCE_TABLE": "source_table",
"TARGET_TABLE": "target_table",
"SOURCE_FILE": SOURCE_FILE,
"TARGET_FILE": TARGET_FILE,
}
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID, "params": params},
) as dag:
# Drop tables if they exist
drop_table = DdlOperator(
task_id="drop_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }};",
"DROP TABLE {{ params.SOURCE_TABLE }}_UV;",
"DROP TABLE {{ params.SOURCE_TABLE }}_ET;",
"DROP TABLE {{ params.SOURCE_TABLE }}_WT;",
"DROP TABLE {{ params.SOURCE_TABLE }}_Log;",
"DROP TABLE {{ params.TARGET_TABLE }};",
"DROP TABLE {{ params.TARGET_TABLE }}_UV;",
"DROP TABLE {{ params.TARGET_TABLE }}_ET;",
"DROP TABLE {{ params.TARGET_TABLE }}_WT;",
"DROP TABLE {{ params.TARGET_TABLE }}_Log;",
],
error_list=[3706, 3803, 3807],
)
create_source_table = DdlOperator(
task_id="create_source_table",
ddl=[
"CREATE TABLE {{ params.SOURCE_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
create_target_table = DdlOperator(
task_id="create_target_table",
ddl=[
"CREATE TABLE {{ params.TARGET_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
create_index_on_source = DdlOperator(
task_id="create_index_on_source",
ddl=["CREATE INDEX idx_employee_id (employee_id) ON {{ params.SOURCE_TABLE }};"],
)
load_file = TdLoadOperator(
task_id="load_file",
source_file_name="{{ params.SOURCE_FILE }}",
target_table="{{ params.SOURCE_TABLE }}",
source_format="Delimited",
source_text_delimiter="|",
)
export_data = TdLoadOperator(
task_id="export_data",
source_table="{{ params.SOURCE_TABLE }}",
target_file_name="{{ params.TARGET_FILE }}",
target_format="Delimited",
target_text_delimiter=";",
)
transfer_data = TdLoadOperator(
task_id="transfer_data",
source_table="{{ params.SOURCE_TABLE }}",
target_table="{{ params.TARGET_TABLE }}",
target_teradata_conn_id=CONN_ID,
)
create_select_dest_table = DdlOperator(
task_id="create_select_dest_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }}_select_dest;",
"DROP TABLE {{ params.SOURCE_TABLE }}_select_log;",
"DROP TABLE {{ params.SOURCE_TABLE }}_select_err1;",
"DROP TABLE {{ params.SOURCE_TABLE }}_select_err2;",
"CREATE TABLE {{ params.SOURCE_TABLE }}_select_dest ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);",
],
error_list=[3706, 3803, 3807],
)
# TdLoadOperator using select statement as source
transfer_data_select_stmt = TdLoadOperator(
task_id="transfer_data_select_stmt",
select_stmt="SELECT * FROM {{ params.SOURCE_TABLE }}",
target_table="{{ params.SOURCE_TABLE }}_select_dest",
tdload_options="--LogTable {{ params.SOURCE_TABLE }}_select_log --ErrorTable1 {{ params.SOURCE_TABLE }}_select_err1 --ErrorTable2 {{ params.SOURCE_TABLE }}_select_err2",
target_teradata_conn_id=CONN_ID,
)
# Create table for insert statement test
create_insert_dest_table = DdlOperator(
task_id="create_insert_dest_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }}_insert_dest;",
"DROP TABLE {{ params.SOURCE_TABLE }}_insert_log;",
"DROP TABLE {{ params.SOURCE_TABLE }}_insert_err1;",
"DROP TABLE {{ params.SOURCE_TABLE }}_insert_err2;",
"CREATE TABLE {{ params.SOURCE_TABLE }}_insert_dest ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);",
],
error_list=[3706, 3803, 3807],
)
transfer_data_insert_stmt = TdLoadOperator(
task_id="transfer_data_insert_stmt",
source_table="{{ params.SOURCE_TABLE }}",
insert_stmt="INSERT INTO {{ params.SOURCE_TABLE }}_insert_dest VALUES (?, ?, ?, ?)",
target_table="{{ params.SOURCE_TABLE }}_insert_dest",
tdload_options="--LogTable {{ params.SOURCE_TABLE }}_insert_log --ErrorTable1 {{ params.SOURCE_TABLE }}_insert_err1 --ErrorTable2 {{ params.SOURCE_TABLE }}_insert_err2",
tdload_job_name="tdload_job_insert_stmt",
target_teradata_conn_id=CONN_ID,
)
rename_target_table = DdlOperator(
task_id="rename_target_table",
ddl=[
"RENAME TABLE {{ params.TARGET_TABLE }} TO {{ params.TARGET_TABLE }}_renamed;",
"DROP TABLE {{ params.TARGET_TABLE }}_renamed",
],
)
drop_index_on_source = DdlOperator(
task_id="drop_index_on_source",
ddl=["DROP INDEX idx_employee_id ON {{ params.SOURCE_TABLE }};"],
error_list=[3706, 3803, 3807],
)
alter_source_table = DdlOperator(
task_id="alter_source_table",
ddl=["ALTER TABLE {{ params.SOURCE_TABLE }} ADD hire_date DATE;"],
)
# Define the task dependencies
(
drop_table
>> create_source_table
>> create_target_table
>> create_index_on_source
>> load_file
>> export_data
>> transfer_data
>> create_select_dest_table
>> transfer_data_select_stmt
>> create_insert_dest_table
>> transfer_data_insert_stmt
>> rename_target_table
>> drop_index_on_source
>> alter_source_table
)
from tests_common.test_utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
TdLoadOperator¶
The TdLoadOperator is an Airflow operator that interfaces with Teradata PT Easy Loader (tdload) to perform data operations on Teradata databases. This operator leverages TPT (Teradata Parallel Transporter), eliminating the need to write manual TPT scripts.
What is Teradata PT Easy Loader? A command-line interface extension for TPT that automatically determines appropriate load/unload operators based on user-provided parameters and the requested operation type.
Note
The TdLoadOperator requires the Teradata Parallel Transporter (TPT) package from Teradata Tools and Utilities (TTU)
to be installed on the machine where the tdload command will run (either local or remote).
Ensure that the tdload executable is available in the system’s PATH.
Refer to the official Teradata documentation for installation and configuration details.
Key Capabilities:
Data Loading: Import data from flat files into Teradata tables
Data Exporting: Extract data from Teradata tables to flat files
Table-to-Table Transfers: Move data between Teradata database tables
Deployment Flexibility: Execute on local or remote machines with TPT installed
Airflow Integration: Seamlessly works with Airflow’s scheduling, monitoring, and logging
The operator simplifies complex Teradata data operations while providing the robustness and reliability of Airflow’s workflow management.
This operator enables the execution of tdload commands on either the local host machine or a remote machine where TPT is installed.
Ensure that the Teradata Parallel Transporter (TPT) package is installed on the machine where TdLoadOperator will execute commands. This can be:
The local machine where Airflow runs the task, for local execution.
A remote host accessed via SSH, for remote execution.
If executing remotely, ensure that an SSH server (e.g., sshd) is running and accessible on the remote machine, and that the tdload executable is available in the system’s PATH.
Note
For improved security, it is highly recommended to use private key-based SSH authentication (SSH key pairs) instead of username/password for the SSH connection.
This avoids password exposure, enables seamless automated execution, and enhances security.
See the Airflow SSH Connection documentation for details on configuring SSH keys: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/ssh.html
To execute data loading, exporting, or transferring operations in a Teradata database, use the
TdLoadOperator.
Prerequisite¶
Make sure your Teradata Airflow connection is defined with the required fields:
hostloginpassword
You can define a remote host with a separate SSH connection using the ssh_conn_id.
Key Operation Examples with TdLoadOperator¶
Loading data into a Teradata database table from a file¶
You can use the TdLoadOperator to load data from a file into a Teradata database table. The following example demonstrates how to load data from a delimited text file into a Teradata table:
load_file = TdLoadOperator(
task_id="load_file",
source_file_name="{{ params.SOURCE_FILE }}",
target_table="{{ params.SOURCE_TABLE }}",
source_format="Delimited",
source_text_delimiter="|",
)
Exporting data from a Teradata table to a file¶
You can export data from a Teradata table to a file using the TdLoadOperator. The following example shows how to export data from a Teradata table to a delimited file:
export_data = TdLoadOperator(
task_id="export_data",
source_table="{{ params.SOURCE_TABLE }}",
target_file_name="{{ params.TARGET_FILE }}",
target_format="Delimited",
target_text_delimiter=";",
)
Transferring data between Teradata tables¶
The TdLoadOperator can also be used to transfer data between two Teradata tables, potentially across different databases:
transfer_data = TdLoadOperator(
task_id="transfer_data",
source_table="{{ params.SOURCE_TABLE }}",
target_table="{{ params.TARGET_TABLE }}",
target_teradata_conn_id=CONN_ID,
)
Transferring data using a SELECT statement as source¶
You can use a SELECT statement as the data source for TdLoadOperator, allowing for flexible data movement and transformation:
# TdLoadOperator using select statement as source
transfer_data_select_stmt = TdLoadOperator(
task_id="transfer_data_select_stmt",
select_stmt="SELECT * FROM {{ params.SOURCE_TABLE }}",
target_table="{{ params.SOURCE_TABLE }}_select_dest",
tdload_options="--LogTable {{ params.SOURCE_TABLE }}_select_log --ErrorTable1 {{ params.SOURCE_TABLE }}_select_err1 --ErrorTable2 {{ params.SOURCE_TABLE }}_select_err2",
target_teradata_conn_id=CONN_ID,
)
Transferring data using an INSERT statement as target¶
You can use an INSERT statement as the target for TdLoadOperator, enabling custom insert logic:
transfer_data_insert_stmt = TdLoadOperator(
task_id="transfer_data_insert_stmt",
source_table="{{ params.SOURCE_TABLE }}",
insert_stmt="INSERT INTO {{ params.SOURCE_TABLE }}_insert_dest VALUES (?, ?, ?, ?)",
target_table="{{ params.SOURCE_TABLE }}_insert_dest",
tdload_options="--LogTable {{ params.SOURCE_TABLE }}_insert_log --ErrorTable1 {{ params.SOURCE_TABLE }}_insert_err1 --ErrorTable2 {{ params.SOURCE_TABLE }}_insert_err2",
tdload_job_name="tdload_job_insert_stmt",
target_teradata_conn_id=CONN_ID,
)
The complete Teradata Operator DAG¶
When we put everything together, our DAG should look like this:
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_tpt"
CONN_ID = "teradata_default"
SSH_CONN_ID = "ssh_default"
# Define file paths and table names for the test
SYSTEM_TESTS_DIR = os.path.abspath(os.path.dirname(__file__))
SOURCE_FILE = os.path.join(SYSTEM_TESTS_DIR, "tdload_src_file.txt")
TARGET_FILE = os.path.join(SYSTEM_TESTS_DIR, "tdload_target_file.txt")
params = {
"SOURCE_TABLE": "source_table",
"TARGET_TABLE": "target_table",
"SOURCE_FILE": SOURCE_FILE,
"TARGET_FILE": TARGET_FILE,
}
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID, "params": params},
) as dag:
# Drop tables if they exist
drop_table = DdlOperator(
task_id="drop_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }};",
"DROP TABLE {{ params.SOURCE_TABLE }}_UV;",
"DROP TABLE {{ params.SOURCE_TABLE }}_ET;",
"DROP TABLE {{ params.SOURCE_TABLE }}_WT;",
"DROP TABLE {{ params.SOURCE_TABLE }}_Log;",
"DROP TABLE {{ params.TARGET_TABLE }};",
"DROP TABLE {{ params.TARGET_TABLE }}_UV;",
"DROP TABLE {{ params.TARGET_TABLE }}_ET;",
"DROP TABLE {{ params.TARGET_TABLE }}_WT;",
"DROP TABLE {{ params.TARGET_TABLE }}_Log;",
],
error_list=[3706, 3803, 3807],
)
create_source_table = DdlOperator(
task_id="create_source_table",
ddl=[
"CREATE TABLE {{ params.SOURCE_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
create_target_table = DdlOperator(
task_id="create_target_table",
ddl=[
"CREATE TABLE {{ params.TARGET_TABLE }} ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);"
],
)
create_index_on_source = DdlOperator(
task_id="create_index_on_source",
ddl=["CREATE INDEX idx_employee_id (employee_id) ON {{ params.SOURCE_TABLE }};"],
)
load_file = TdLoadOperator(
task_id="load_file",
source_file_name="{{ params.SOURCE_FILE }}",
target_table="{{ params.SOURCE_TABLE }}",
source_format="Delimited",
source_text_delimiter="|",
)
export_data = TdLoadOperator(
task_id="export_data",
source_table="{{ params.SOURCE_TABLE }}",
target_file_name="{{ params.TARGET_FILE }}",
target_format="Delimited",
target_text_delimiter=";",
)
transfer_data = TdLoadOperator(
task_id="transfer_data",
source_table="{{ params.SOURCE_TABLE }}",
target_table="{{ params.TARGET_TABLE }}",
target_teradata_conn_id=CONN_ID,
)
create_select_dest_table = DdlOperator(
task_id="create_select_dest_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }}_select_dest;",
"DROP TABLE {{ params.SOURCE_TABLE }}_select_log;",
"DROP TABLE {{ params.SOURCE_TABLE }}_select_err1;",
"DROP TABLE {{ params.SOURCE_TABLE }}_select_err2;",
"CREATE TABLE {{ params.SOURCE_TABLE }}_select_dest ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);",
],
error_list=[3706, 3803, 3807],
)
# TdLoadOperator using select statement as source
transfer_data_select_stmt = TdLoadOperator(
task_id="transfer_data_select_stmt",
select_stmt="SELECT * FROM {{ params.SOURCE_TABLE }}",
target_table="{{ params.SOURCE_TABLE }}_select_dest",
tdload_options="--LogTable {{ params.SOURCE_TABLE }}_select_log --ErrorTable1 {{ params.SOURCE_TABLE }}_select_err1 --ErrorTable2 {{ params.SOURCE_TABLE }}_select_err2",
target_teradata_conn_id=CONN_ID,
)
# Create table for insert statement test
create_insert_dest_table = DdlOperator(
task_id="create_insert_dest_table",
ddl=[
"DROP TABLE {{ params.SOURCE_TABLE }}_insert_dest;",
"DROP TABLE {{ params.SOURCE_TABLE }}_insert_log;",
"DROP TABLE {{ params.SOURCE_TABLE }}_insert_err1;",
"DROP TABLE {{ params.SOURCE_TABLE }}_insert_err2;",
"CREATE TABLE {{ params.SOURCE_TABLE }}_insert_dest ( \
first_name VARCHAR(100), \
last_name VARCHAR(100), \
employee_id VARCHAR(10), \
department VARCHAR(50) \
);",
],
error_list=[3706, 3803, 3807],
)
transfer_data_insert_stmt = TdLoadOperator(
task_id="transfer_data_insert_stmt",
source_table="{{ params.SOURCE_TABLE }}",
insert_stmt="INSERT INTO {{ params.SOURCE_TABLE }}_insert_dest VALUES (?, ?, ?, ?)",
target_table="{{ params.SOURCE_TABLE }}_insert_dest",
tdload_options="--LogTable {{ params.SOURCE_TABLE }}_insert_log --ErrorTable1 {{ params.SOURCE_TABLE }}_insert_err1 --ErrorTable2 {{ params.SOURCE_TABLE }}_insert_err2",
tdload_job_name="tdload_job_insert_stmt",
target_teradata_conn_id=CONN_ID,
)
rename_target_table = DdlOperator(
task_id="rename_target_table",
ddl=[
"RENAME TABLE {{ params.TARGET_TABLE }} TO {{ params.TARGET_TABLE }}_renamed;",
"DROP TABLE {{ params.TARGET_TABLE }}_renamed",
],
)
drop_index_on_source = DdlOperator(
task_id="drop_index_on_source",
ddl=["DROP INDEX idx_employee_id ON {{ params.SOURCE_TABLE }};"],
error_list=[3706, 3803, 3807],
)
alter_source_table = DdlOperator(
task_id="alter_source_table",
ddl=["ALTER TABLE {{ params.SOURCE_TABLE }} ADD hire_date DATE;"],
)
# Define the task dependencies
(
drop_table
>> create_source_table
>> create_target_table
>> create_index_on_source
>> load_file
>> export_data
>> transfer_data
>> create_select_dest_table
>> transfer_data_select_stmt
>> create_insert_dest_table
>> transfer_data_insert_stmt
>> rename_target_table
>> drop_index_on_source
>> alter_source_table
)
from tests_common.test_utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)