Google Cloud Datastore Operators

Firestore in Datastore mode is a NoSQL document database built for automatic scaling, high performance, and ease of application development.

For more information about the service visit Datastore product documentation

Prerequisite Tasks

To use these operators, you must do a few things:

Export Entities

To export entities from Google Cloud Datastore to Cloud Storage use CloudDatastoreExportEntitiesOperator

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

export_task = CloudDatastoreExportEntitiesOperator(
    task_id="export_task",
    bucket=BUCKET_NAME,
    project_id=PROJECT_ID,
    overwrite_existing=True,
)

Import Entities

To import entities from Cloud Storage to Google Cloud Datastore use CloudDatastoreImportEntitiesOperator

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

import_task = CloudDatastoreImportEntitiesOperator(
    task_id="import_task",
    bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
    file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
    project_id=PROJECT_ID,
)

Allocate Ids

To allocate IDs for incomplete keys use CloudDatastoreAllocateIdsOperator

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

allocate_ids = CloudDatastoreAllocateIdsOperator(
    task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)

An example of a partial keys required by the operator:

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

KEYS = [
    {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
        "path": {"kind": "airflow"},
    }
]

Begin transaction

To begin a new transaction use CloudDatastoreBeginTransactionOperator

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
    task_id="begin_transaction_commit",
    transaction_options=TRANSACTION_OPTIONS,
    project_id=PROJECT_ID,
)

An example of a transaction options required by the operator:

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}

Commit transaction

To commit a transaction, optionally creating, deleting or modifying some entities use CloudDatastoreCommitOperator

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)

An example of a commit information required by the operator:

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

    COMMIT_BODY = {
        "mode": "TRANSACTIONAL",
        "mutations": [
            {
                "insert": {
                    "key": KEYS[0],
                    "properties": {"string": {"stringValue": "airflow is awesome!"}},
                }
            }
        ],
        "singleUseTransaction": {"readWrite": {}},
    }

Run query

To run a query for entities use CloudDatastoreRunQueryOperator

google/tests/system/google/cloud/datastore/example_datastore_query.py[source]

run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)

An example of a query required by the operator:

google/tests/system/google/cloud/datastore/example_datastore_query.py[source]

    QUERY = {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
        "readOptions": {"transaction": begin_transaction_query.output},
        "query": {},
    }

Roll back transaction

To roll back a transaction use CloudDatastoreRollbackOperator

google/tests/system/google/cloud/datastore/example_datastore_rollback.py[source]

rollback_transaction = CloudDatastoreRollbackOperator(
    task_id="rollback_transaction",
    transaction=begin_transaction_to_rollback.output,
)

Get operation state

To get the current state of a long-running operation use CloudDatastoreGetOperationOperator

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

get_operation = CloudDatastoreGetOperationOperator(
    task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)

Delete operation

To delete an operation use CloudDatastoreDeleteOperationOperator

google/tests/system/google/cloud/datastore/example_datastore_commit.py[source]

delete_export_operation = CloudDatastoreDeleteOperationOperator(
    task_id="delete_export_operation",
    name="{{ task_instance.xcom_pull('export_task')['name'] }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

References

For further information, take a look at:

Was this entry helpful?