Google Dataform Operators¶
Dataform is a service for data analysts to develop, test, version control, and schedule complex SQL workflows for data transformation in BigQuery.
Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process for data integration. After raw data is extracted from source systems and loaded into BigQuery, Dataform helps you to transform it into a well-defined, tested, and documented suite of data tables.
For more information about the task visit Dataform documentation
Configuration¶
Before you can use the Dataform operators you need to initialize repository and workspace, for more information about this visit Dataform documentation
Create Repository¶
To create a repository for tracking your code in Dataform service use
DataformCreateRepositoryOperator
.
Example of usage can be seen below:
make_repository = DataformCreateRepositoryOperator(
task_id="make-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
)
Create Workspace¶
To create a workspace for storing your code in Dataform service use
DataformCreateWorkspaceOperator
.
Example of usage can be seen below:
make_workspace = DataformCreateWorkspaceOperator(
task_id="make-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)
Create Compilation Result¶
To create a Compilation Result use
DataformCreateCompilationResultOperator
.
A simple configuration can look as followed:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": "main",
"workspace": (
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
f"workspaces/{WORKSPACE_ID}"
),
},
)
Get Compilation Result¶
To get a Compilation Result you can use
DataformGetCompilationResultOperator
.
get_compilation_result = DataformGetCompilationResultOperator(
task_id="get-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result_id=(
"{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
),
)
Create Workflow Invocation¶
To create a Workflow Invocation you can use
DataformCreateWorkflowInvocationOperator
.
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
We have possibility to run this operation in the sync mode and async, for async operation we also have
a sensor
DataformWorkflowInvocationStateSensor
.
create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation-async",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is-workflow-invocation-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
We also have a sensor
DataformWorkflowInvocationActionStateSensor
to check the status of a particular action for a workflow invocation triggered asynchronously.
is_workflow_invocation_action_done = DataformWorkflowInvocationActionStateSensor(
task_id="is-workflow-invocation-action-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
),
target_name="first_view",
expected_statuses={WorkflowInvocationAction.State.SUCCEEDED},
failure_statuses={
WorkflowInvocationAction.State.SKIPPED,
WorkflowInvocationAction.State.DISABLED,
WorkflowInvocationAction.State.CANCELLED,
WorkflowInvocationAction.State.FAILED,
},
)
Get Workflow Invocation¶
To get a Workflow Invocation you can use
DataformGetWorkflowInvocationOperator
.
get_workflow_invocation = DataformGetWorkflowInvocationOperator(
task_id="get-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
Query Workflow Invocation Action¶
To query Workflow Invocation Actions you can use
DataformQueryWorkflowInvocationActionsOperator
.
query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
task_id="query-workflow-invocation-actions",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
Cancel Workflow Invocation¶
To cancel a Workflow Invocation you can use
DataformCancelWorkflowInvocationOperator
.
cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
task_id="cancel-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
),
)
Delete Repository¶
To delete a repository use
DataformDeleteRepositoryOperator
.
Example of usage can be seen below:
delete_repository = DataformDeleteRepositoryOperator(
task_id="delete-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
Delete Workspace¶
To delete a workspace use
DataformDeleteWorkspaceOperator
.
Example of usage can be seen below:
delete_workspace = DataformDeleteWorkspaceOperator(
task_id="delete-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
Remove file¶
To remove a file use
DataformRemoveFileOperator
.
Example of usage can be seen below:
remove_test_file = DataformRemoveFileOperator(
task_id="remove-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
)
Remove directory¶
To remove a directory use
DataformRemoveDirectoryOperator
.
Example of usage can be seen below:
remove_test_directory = DataformRemoveDirectoryOperator(
task_id="remove-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
Initialize workspace¶
Creates default projects structure for provided workspace. Before it can be done workspace and repository should be created. Example of usage can be seen below:
first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
package_name=f"dataform_package_{ENV_ID}",
without_installation=True,
dataform_schema_name=DATAFORM_SCHEMA_NAME,
)
Write file to workspace¶
To write a file with given content to specified workspace use
DataformWriteFileOperator
.
test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
task_id="make-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
contents=test_file_content,
)
Make directory in workspace¶
To make a directory with given path in specified workspace use
DataformMakeDirectoryOperator
.
make_test_directory = DataformMakeDirectoryOperator(
task_id="make-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
Install NPM packages¶
To install npm packages for specified workspace use
DataformInstallNpmPackagesOperator
.
install_npm_packages = DataformInstallNpmPackagesOperator(
task_id="install-npm-packages",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)