MongoDB To Google Cloud Storage Operator¶
The Google Cloud Storage (GCS) service is used to store large data from various applications. This page shows how to copy data from MongoDB to GCS.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
MongoToGCSOperator¶
MongoToGCSOperator allows you to upload
data from a MongoDB collection to GCS in JSON, CSV, or Parquet format.
The operator accepts either a find() filter (a dict) or an aggregation pipeline (a list)
through the mongo_query parameter. When mongo_query is a dict, mongo_projection may be
used to limit the fields returned. When mongo_query is a list, the value is passed as an
aggregation pipeline and mongo_projection is ignored.
The schema is derived from the first document in the result set, so all documents are expected to
share a consistent shape; missing fields are exported as null.
Below is an example of using this operator to export the result of a find() query to GCS.
mongo_to_gcs_find = MongoToGCSOperator(
task_id="mongo_to_gcs_find",
mongo_conn_id=MONGO_CONN_ID,
mongo_db=MONGO_DATABASE,
mongo_collection=MONGO_COLLECTION,
mongo_query={"status": "active"},
mongo_projection={"_id": 1, "name": 1, "status": 1},
bucket=BUCKET_NAME,
filename=FILE_NAME,
schema_filename=SCHEMA_FILE_NAME,
export_format="json",
)
The operator also supports running an aggregation pipeline.
mongo_to_gcs_aggregate = MongoToGCSOperator(
task_id="mongo_to_gcs_aggregate",
mongo_conn_id=MONGO_CONN_ID,
mongo_db=MONGO_DATABASE,
mongo_collection=MONGO_COLLECTION,
mongo_query=[
{"$match": {"status": "active"}},
{"$group": {"_id": "$category", "total": {"$sum": 1}}},
],
bucket=BUCKET_NAME,
filename="mongo_aggregate_{}.ndjson",
export_format="json",
)
Reference¶
For further information, look at: