Google Knowledge Catalog Operators

Knowledge Catalog is an intelligent data fabric that provides unified analytics and data management across your data lakes, data warehouses, and data marts.

For more information about the task visit Knowledge Catalog production documentation <Product documentation

Create a Task

Before you create a dataplex task you need to define its body. For more information about the available fields to pass when creating a task, visit Knowledge Catalog create task API.

A simple task configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex.py[source]

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

With this configuration we can create the task both synchronously & asynchronously: DataplexCreateTaskOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateTaskOperator.

tests/system/google/cloud/dataplex/example_dataplex.py[source]

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

tests/system/google/cloud/dataplex/example_dataplex.py[source]

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

Delete a task

To delete a task you can use:

DataplexDeleteTaskOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteTaskOperator.

tests/system/google/cloud/dataplex/example_dataplex.py[source]

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

List tasks

To list tasks you can use:

DataplexListTasksOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogListTasksOperator.

tests/system/google/cloud/dataplex/example_dataplex.py[source]

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

Get a task

To get a task you can use:

DataplexGetTaskOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetTaskOperator.

tests/system/google/cloud/dataplex/example_dataplex.py[source]

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

Wait for a task

To wait for a task created asynchronously you can use:

DataplexTaskStateSensor

tests/system/google/cloud/dataplex/example_dataplex.py[source]

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

Create a Lake

Before you create a dataplex lake you need to define its body.

For more information about the available fields to pass when creating a lake, visit Knowledge Catalog create lake API.

A simple task configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex.py[source]

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

With this configuration we can create the lake:

DataplexCreateLakeOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateLakeOperator.

tests/system/google/cloud/dataplex/example_dataplex.py[source]

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

Delete a lake

To delete a lake you can use:

DataplexDeleteLakeOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteLakeOperator.

tests/system/google/cloud/dataplex/example_dataplex.py[source]

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

Create or update a Data Quality scan

Before you create a Knowledge Catalog Data Quality scan you need to define its body. For more information about the available fields to pass when creating a Data Quality scan, visit Knowledge Catalog create data quality API.

A simple Data Quality scan configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
    {
        "rules": [
            {
                "range_expectation": {
                    "min_value": "0",
                    "max_value": "10000",
                },
                "column": "value",
                "dimension": "VALIDITY",
            }
        ],
    }
)

With this configuration we can create or update the Data Quality scan:

DataplexCreateOrUpdateDataQualityScanOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateOrUpdateDataQualityScanOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

Get a Data Quality scan

To get a Data Quality scan you can use:

DataplexGetDataQualityScanOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetDataQualityScanOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan = DataplexGetDataQualityScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Delete a Data Quality scan

To delete a Data Quality scan you can use:

DataplexDeleteDataQualityScanOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteDataQualityScanOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_data_scan = DataplexDeleteDataQualityScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Run a Data Quality scan

You can run Knowledge Catalog Data Quality scan in asynchronous modes to later check its status using sensor:

DataplexRunDataQualityScanOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogRunDataQualityScanOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

run_data_scan_async = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

To check that running Knowledge Catalog Data Quality scan succeeded you can use:

DataplexDataQualityJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id=run_data_scan_async.output,
)

Also for this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

run_data_scan_def = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Get a Data Quality scan job

To get a Data Quality scan job you can use:

DataplexGetDataQualityScanResultOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetDataQualityScanResultOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Also for this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Create a zone

Before you create a Knowledge Catalog zone you need to define its body.

For more information about the available fields to pass when creating a zone, visit Knowledge Catalog create zone API.

A simple zone configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_ZONE = {
    "type_": "RAW",
    "resource_spec": {"location_type": "SINGLE_REGION"},
}

With this configuration we can create a zone:

DataplexCreateZoneOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateZoneOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

create_zone = DataplexCreateZoneOperator(
    task_id="create_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_ZONE,
    zone_id=ZONE_ID,
)

Delete a zone

To delete a zone you can use:

DataplexDeleteZoneOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteZoneOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_zone = DataplexDeleteZoneOperator(
    task_id="delete_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Create an asset

Before you create a Knowledge Catalog asset you need to define its body.

For more information about the available fields to pass when creating an asset, visit Knowledge Catalog create asset API.

A simple asset configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_ASSET = {
    "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
    "discovery_spec": {"enabled": True},
}

With this configuration we can create the asset:

DataplexCreateAssetOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateAssetOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

create_asset = DataplexCreateAssetOperator(
    task_id="create_asset",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_ASSET,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

Delete an asset

To delete an asset you can use:

DataplexDeleteAssetOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteAssetOperator.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_asset = DataplexDeleteAssetOperator(
    task_id="delete_asset",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Create or update a Data Profile scan

Before you create a Knowledge Catalog Data Profile scan you need to define its body. For more information about the available fields to pass when creating a Data Profile scan, visit Knowledge Catalog create data profile API.

A simple Data Profile scan configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})

With this configuration we can create or update the Data Profile scan:

DataplexCreateOrUpdateDataProfileScanOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateOrUpdateDataProfileScanOperator.

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

Get a Data Profile scan

To get a Data Profile scan you can use:

DataplexGetDataProfileScanOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetDataProfileScanOperator.

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan = DataplexGetDataProfileScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Delete a Data Profile scan

To delete a Data Profile scan you can use:

DataplexDeleteDataProfileScanOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteDataProfileScanOperator.

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

delete_data_scan = DataplexDeleteDataProfileScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Run a Data Profile scan

You can run Knowledge Catalog Data Profile scan in asynchronous modes to later check its status using sensor:

DataplexRunDataProfileScanOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogRunDataProfileScanOperator.

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

run_data_scan_async = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

To check that running Knowledge Catalog Data Profile scan succeeded you can use:

DataplexDataProfileJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id=run_data_scan_async.output,
)

Also for this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

run_data_scan_def = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Get a Data Profile scan job

To get a Data Profile scan job you can use:

DataplexGetDataProfileScanResultOperator

The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetDataProfileScanResultOperator.

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Google Knowledge Catalog Entry Operators

Knowledge Catalog provides a unified inventory of Google Cloud resources, such as BigQuery, and other resources, such as on-premises resources. Knowledge Catalog automatically retrieves metadata for Google Cloud resources, and you bring metadata for third-party resources into Knowledge Catalog.

For more information about Knowledge Catalog visit Knowledge Catalog production documentation <Product documentation

Create an EntryGroup

To create an Entry Group in a specific Knowledge Catalog location you can use DataplexCatalogCreateEntryGroupOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateEntryGroupOperator. For more information about the available fields to pass when creating an Entry Group, visit Entry Group resource configuration.

A simple Entry Group configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some description"}

With this configuration you can create an Entry Group resource:

DataplexCatalogCreateEntryGroupOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateEntryGroupOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

create_entry_group = DataplexCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration=ENTRY_GROUP_BODY,
    validate_request=False,
)

Delete an EntryGroup

To delete an Entry Group in a specific Knowledge Catalog location you can use DataplexCatalogDeleteEntryGroupOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteEntryGroupOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List EntryGroups

To list all Entry Groups in a specific Knowledge Catalog location you can use DataplexCatalogListEntryGroupsOperator. This operator also supports filtering and ordering the result of the operation. The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogListEntryGroupsOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

list_entry_group = DataplexCatalogListEntryGroupsOperator(
    task_id="list_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an EntryGroup

To retrieve an Entry Group in a specific Knowledge Catalog location you can use DataplexCatalogGetEntryGroupOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetEntryGroupOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

get_entry_group = DataplexCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

Update an EntryGroup

To update an Entry Group in a specific Knowledge Catalog location you can use DataplexCatalogUpdateEntryGroupOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogUpdateEntryGroupOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
    task_id="update_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an EntryType

To create an Entry Type in a specific Knowledge Catalog location you can use DataplexCatalogCreateEntryTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateEntryTypeOperator. For more information about the available fields to pass when creating an Entry Type, visit Entry Type resource configuration.

A simple Entry Group configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some description"}

With this configuration you can create an Entry Type resource:

DataplexCatalogCreateEntryTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateEntryTypeOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

create_entry_type = DataplexCatalogCreateEntryTypeOperator(
    task_id="create_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration=ENTRY_TYPE_BODY,
    validate_request=False,
)

Delete an EntryType

To delete an Entry Type in a specific Knowledge Catalog location you can use DataplexCatalogDeleteEntryTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteEntryTypeOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

delete_entry_type = DataplexCatalogDeleteEntryTypeOperator(
    task_id="delete_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List EntryTypes

To list all Entry Types in a specific Knowledge Catalog location you can use DataplexCatalogListEntryTypesOperator. This operator also supports filtering and ordering the result of the operation. The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogListEntryTypesOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

list_entry_type = DataplexCatalogListEntryTypesOperator(
    task_id="list_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an EntryType

To retrieve an Entry Group in a specific Knowledge Catalog location you can use DataplexCatalogGetEntryTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetEntryTypeOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

get_entry_type = DataplexCatalogGetEntryTypeOperator(
    task_id="get_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
)

Update an EntryType

To update an Entry Type in a specific Knowledge Catalog location you can use DataplexCatalogUpdateEntryTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogUpdateEntryTypeOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

update_entry_type = DataplexCatalogUpdateEntryTypeOperator(
    task_id="update_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an AspectType

To create an Aspect Type in a specific Knowledge Catalog location you can use DataplexCatalogCreateAspectTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateAspectTypeOperator. For more information about the available fields to pass when creating an Aspect Type, visit Aspect Type resource configuration.

A simple Aspect Group configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

ASPECT_TYPE_BODY = {
    "display_name": "Sample AspectType",
    "description": "A simple AspectType for demonstration purposes.",
    "metadata_template": {
        "name": "sample_field",
        "type": "record",
        "annotations": {
            "display_name": "Sample Field",
            "description": "A sample field within the AspectType.",
        },
    },
}

With this configuration you can create an Aspect Type resource:

DataplexCatalogCreateAspectTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateAspectTypeOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
    task_id="create_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration=ASPECT_TYPE_BODY,
    validate_request=False,
)

Delete an AspectType

To delete an Aspect Type in a specific Knowledge Catalog location you can use DataplexCatalogDeleteAspectTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteAspectTypeOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
    task_id="delete_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List AspectTypes

To list all Aspect Types in a specific Knowledge Catalog location you can use DataplexCatalogListAspectTypesOperator. This operator also supports filtering and ordering the result of the operation. The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogListAspectTypesOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

list_aspect_type = DataplexCatalogListAspectTypesOperator(
    task_id="list_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an AspectType

To retrieve an Aspect Group in a specific Knowledge Catalog location you can use DataplexCatalogGetAspectTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetAspectTypeOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

get_aspect_type = DataplexCatalogGetAspectTypeOperator(
    task_id="get_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
)

Update an AspectType

To update an Aspect Type in a specific Knowledge Catalog location you can use DataplexCatalogUpdateAspectTypeOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogUpdateAspectTypeOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
    task_id="update_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an Entry

To create an Entry in a specific Knowledge Catalog location you can use DataplexCatalogCreateEntryOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateEntryOperator. For more information about the available fields to pass when creating an Entry, visit Entry resource configuration.

A simple Entry configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

ENTRY_BODY = {
    "name": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryGroups/{ENTRY_GROUP_NAME}/entries/{ENTRY_NAME}",
    "entry_type": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryTypes/{ENTRY_TYPE_NAME}",
}

With this configuration you can create an Entry resource:

DataplexCatalogCreateEntryOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogCreateEntryOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

create_entry = DataplexCatalogCreateEntryOperator(
    task_id="create_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration=ENTRY_BODY,
)

Delete an Entry

To delete an Entry in a specific Knowledge Catalog location you can use DataplexCatalogDeleteEntryOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogDeleteEntryOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

delete_entry = DataplexCatalogDeleteEntryOperator(
    task_id="delete_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List Entries

To list all Entries in a specific Knowledge Catalog location you can use DataplexCatalogListEntriesOperator. This operator also supports filtering and ordering the result of the operation. The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogListEntriesOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

list_entry = DataplexCatalogListEntriesOperator(
    task_id="list_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

Get an Entry

To retrieve an Entry in a specific Knowledge Catalog location you can use DataplexCatalogGetEntryOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogGetEntryOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

get_entry = DataplexCatalogGetEntryOperator(
    task_id="get_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

Update an Entry

To update an Entry in a specific Knowledge Catalog location you can use DataplexCatalogUpdateEntryOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogUpdateEntryOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

update_entry = DataplexCatalogUpdateEntryOperator(
    task_id="update_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration={
        "fully_qualified_name": f"dataplex:{PROJECT_ID}.{GCP_LOCATION}.{ENTRY_GROUP_NAME}.some-entry"
    },
    update_mask=["fully_qualified_name"],
)

Look up a single Entry

To look up a single Entry by name using the permission on the source system in Knowledge Catalog you can use DataplexCatalogLookupEntryOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogLookupEntryOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

lookup_entry = DataplexCatalogLookupEntryOperator(
    task_id="lookup_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

Search Entries

To search for Entries matching the given query and scope in Knowledge Catalog you can use DataplexCatalogSearchEntriesOperator The executable example below still imports the compatibility name shown above. The preferred alias for new code is KnowledgeCatalogSearchEntriesOperator.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

search_entry = DataplexCatalogSearchEntriesOperator(
    task_id="search_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    query=f"name={ENTRY_NAME}",
)

Was this entry helpful?