airflow.providers.google.cloud.transfers.mongo_to_gcs¶
MongoDB to GCS operator.
Classes¶
Copy data from MongoDB to Google Cloud Storage in JSON, CSV or Parquet format. |
Module Contents¶
- class airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoToGCSOperator(*, mongo_conn_id='mongo_default', mongo_db, mongo_collection, mongo_query=None, mongo_projection=None, allow_disk_use=True, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.transfers.sql_to_gcs.BaseSQLToGCSOperatorCopy data from MongoDB to Google Cloud Storage in JSON, CSV or Parquet format.
Note
MongoDB is a NoSQL store, so subclassing
BaseSQLToGCSOperatoris a deliberate reuse choice rather than a natural fit. The base class already implements the chunking, schema inference and GCS upload flow we want; this operator reuses it by adapting the pymongo cursor to a DB-API style cursor (see_MongoCursorAdapter) and overridingquery/field_to_bigquery/convert_type. A dedicatedBaseNoSQLToGCSOperatorcould be a cleaner home for this in the future.See also
For more information on how to use this operator, take a look at the guide: MongoToGCSOperator
- Parameters:
mongo_conn_id (str) – Reference to a specific Mongo connection.
mongo_db (str) – The MongoDB database name.
mongo_collection (str) – The MongoDB collection name.
mongo_query (dict | list | None) – A MongoDB find filter (
dict) or aggregation pipeline (list). Defaults to{}(match all).mongo_projection (dict | list | None) – Optional projection passed to
find(). Ignored whenmongo_queryis an aggregation pipeline. Accepts a dict ({"field": 1}) or list of field names.allow_disk_use (bool) – Whether to pass
allowDiskUse=Truetoaggregate(). Defaults to True.
- template_fields: collections.abc.Sequence[str] = ('bucket', 'filename', 'schema_filename', 'schema', 'parameters', 'impersonation_chain',...[source]¶
- template_ext: collections.abc.Sequence[str] = ()[source]¶
- property db_hook: airflow.providers.mongo.hooks.mongo.MongoHook[source]¶
- convert_type(value, schema_type, **kwargs)[source]¶
Convert pymongo values to BigQuery-friendly types.
ObjectId->str.Decimal128/Decimal->float.bytes-> base64-encodedstr(orintwhenschema_type == 'INTEGER').datetime->str(value).date-> ISO date string whenschema_type == 'DATE', otherwise combineddatetimestring.list/dict/tuple-> JSON string.