MongoDB To Google Cloud Storage Operator

The Google Cloud Storage (GCS) service is used to store large data from various applications. This page shows how to copy data from MongoDB to GCS.

Prerequisite Tasks

To use these operators, you must do a few things:

MongoToGCSOperator

MongoToGCSOperator allows you to upload data from a MongoDB collection to GCS in JSON, CSV, or Parquet format.

The operator accepts either a find() filter (a dict) or an aggregation pipeline (a list) through the mongo_query parameter. When mongo_query is a dict, mongo_projection may be used to limit the fields returned. When mongo_query is a list, the value is passed as an aggregation pipeline and mongo_projection is ignored.

The schema is derived from the first document in the result set, so all documents are expected to share a consistent shape; missing fields are exported as null.

Below is an example of using this operator to export the result of a find() query to GCS.

tests/system/google/cloud/transfers/example_mongo_to_gcs.py[source]

mongo_to_gcs_find = MongoToGCSOperator(
    task_id="mongo_to_gcs_find",
    mongo_conn_id=MONGO_CONN_ID,
    mongo_db=MONGO_DATABASE,
    mongo_collection=MONGO_COLLECTION,
    mongo_query={"status": "active"},
    mongo_projection={"_id": 1, "name": 1, "status": 1},
    bucket=BUCKET_NAME,
    filename=FILE_NAME,
    schema_filename=SCHEMA_FILE_NAME,
    export_format="json",
)

The operator also supports running an aggregation pipeline.

tests/system/google/cloud/transfers/example_mongo_to_gcs.py[source]

mongo_to_gcs_aggregate = MongoToGCSOperator(
    task_id="mongo_to_gcs_aggregate",
    mongo_conn_id=MONGO_CONN_ID,
    mongo_db=MONGO_DATABASE,
    mongo_collection=MONGO_COLLECTION,
    mongo_query=[
        {"$match": {"status": "active"}},
        {"$group": {"_id": "$category", "total": {"$sum": 1}}},
    ],
    bucket=BUCKET_NAME,
    filename="mongo_aggregate_{}.ndjson",
    export_format="json",
)

Reference

For further information, look at:

Was this entry helpful?