Google Dataform Operators

Dataform is a service for data analysts to develop, test, version control, and schedule complex SQL workflows for data transformation in BigQuery.

Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process for data integration. After raw data is extracted from source systems and loaded into BigQuery, Dataform helps you to transform it into a well-defined, tested, and documented suite of data tables.

For more information about the task visit Dataform documentation

Configuration

Before you can use the Dataform operators you need to initialize repository and workspace, for more information about this visit Dataform documentation

Create Repository

To create a repository for tracking your code in Dataform service use DataformCreateRepositoryOperator. Example of usage can be seen below:

tests/system/google/cloud/dataform/example_dataform.py[source]

    make_repository = DataformCreateRepositoryOperator(
        task_id="make-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
    )

Create Workspace

To create a workspace for storing your code in Dataform service use DataformCreateWorkspaceOperator. Example of usage can be seen below:

tests/system/google/cloud/dataform/example_dataform.py[source]

    make_workspace = DataformCreateWorkspaceOperator(
        task_id="make-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
    )

Create Compilation Result

To create a Compilation Result use DataformCreateCompilationResultOperator. A simple configuration can look as followed:

tests/system/google/cloud/dataform/example_dataform.py[source]

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create-compilation-result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": "main",
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
                f"workspaces/{WORKSPACE_ID}"
            ),
        },
    )

Get Compilation Result

To get a Compilation Result you can use DataformGetCompilationResultOperator.

tests/system/google/cloud/dataform/example_dataform.py[source]

get_compilation_result = DataformGetCompilationResultOperator(
    task_id="get-compilation-result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result_id=(
        "{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
    ),
)

Create Workflow Invocation

To create a Workflow Invocation you can use DataformCreateWorkflowInvocationOperator.

tests/system/google/cloud/dataform/example_dataform.py[source]

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

We have possibility to run this operation in the sync mode and async, for async operation we also have a sensor DataformWorkflowInvocationStateSensor.

tests/system/google/cloud/dataform/example_dataform.py[source]

create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation-async",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is-workflow-invocation-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
    ),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)

We also have a sensor DataformWorkflowInvocationActionStateSensor to check the status of a particular action for a workflow invocation triggered asynchronously.

tests/system/google/cloud/dataform/example_dataform.py[source]

is_workflow_invocation_action_done = DataformWorkflowInvocationActionStateSensor(
    task_id="is-workflow-invocation-action-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
    ),
    target_name="first_view",
    expected_statuses={WorkflowInvocationAction.State.SUCCEEDED},
    failure_statuses={
        WorkflowInvocationAction.State.SKIPPED,
        WorkflowInvocationAction.State.DISABLED,
        WorkflowInvocationAction.State.CANCELLED,
        WorkflowInvocationAction.State.FAILED,
    },
)

Get Workflow Invocation

To get a Workflow Invocation you can use DataformGetWorkflowInvocationOperator.

tests/system/google/cloud/dataform/example_dataform.py[source]

get_workflow_invocation = DataformGetWorkflowInvocationOperator(
    task_id="get-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

Query Workflow Invocation Action

To query Workflow Invocation Actions you can use DataformQueryWorkflowInvocationActionsOperator.

tests/system/google/cloud/dataform/example_dataform.py[source]

query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
    task_id="query-workflow-invocation-actions",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

Cancel Workflow Invocation

To cancel a Workflow Invocation you can use DataformCancelWorkflowInvocationOperator.

tests/system/google/cloud/dataform/example_dataform.py[source]

cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
    task_id="cancel-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
    ),
)

Delete Repository

To delete a repository use DataformDeleteRepositoryOperator. Example of usage can be seen below:

tests/system/google/cloud/dataform/example_dataform.py[source]

    delete_repository = DataformDeleteRepositoryOperator(
        task_id="delete-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

Delete Workspace

To delete a workspace use DataformDeleteWorkspaceOperator. Example of usage can be seen below:

tests/system/google/cloud/dataform/example_dataform.py[source]

    delete_workspace = DataformDeleteWorkspaceOperator(
        task_id="delete-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

Remove file

To remove a file use DataformRemoveFileOperator. Example of usage can be seen below:

tests/system/google/cloud/dataform/example_dataform.py[source]

    remove_test_file = DataformRemoveFileOperator(
        task_id="remove-test-file",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        filepath="test/test.txt",
    )

Remove directory

To remove a directory use DataformRemoveDirectoryOperator. Example of usage can be seen below:

tests/system/google/cloud/dataform/example_dataform.py[source]

    remove_test_directory = DataformRemoveDirectoryOperator(
        task_id="remove-test-directory",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        directory_path="test",
    )

Initialize workspace

Creates default projects structure for provided workspace. Before it can be done workspace and repository should be created. Example of usage can be seen below:

tests/system/google/cloud/dataform/example_dataform.py[source]

    first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        package_name=f"dataform_package_{ENV_ID}",
        without_installation=True,
        dataform_schema_name=DATAFORM_SCHEMA_NAME,
    )

Write file to workspace

To write a file with given content to specified workspace use DataformWriteFileOperator.

tests/system/google/cloud/dataform/example_dataform.py[source]

test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
    task_id="make-test-file",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    filepath="test/test.txt",
    contents=test_file_content,
)

Make directory in workspace

To make a directory with given path in specified workspace use DataformMakeDirectoryOperator.

tests/system/google/cloud/dataform/example_dataform.py[source]

make_test_directory = DataformMakeDirectoryOperator(
    task_id="make-test-directory",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    directory_path="test",
)

Install NPM packages

To install npm packages for specified workspace use DataformInstallNpmPackagesOperator.

tests/system/google/cloud/dataform/example_dataform.py[source]

install_npm_packages = DataformInstallNpmPackagesOperator(
    task_id="install-npm-packages",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
)

Was this entry helpful?