airflow.providers.google.cloud.transfers.mongo_to_gcs

MongoDB to GCS operator.

Classes

MongoToGCSOperator

Copy data from MongoDB to Google Cloud Storage in JSON, CSV or Parquet format.

Module Contents

class airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoToGCSOperator(*, mongo_conn_id='mongo_default', mongo_db, mongo_collection, mongo_query=None, mongo_projection=None, allow_disk_use=True, **kwargs)[source]

Bases: airflow.providers.google.cloud.transfers.sql_to_gcs.BaseSQLToGCSOperator

Copy data from MongoDB to Google Cloud Storage in JSON, CSV or Parquet format.

Note

MongoDB is a NoSQL store, so subclassing BaseSQLToGCSOperator is a deliberate reuse choice rather than a natural fit. The base class already implements the chunking, schema inference and GCS upload flow we want; this operator reuses it by adapting the pymongo cursor to a DB-API style cursor (see _MongoCursorAdapter) and overriding query / field_to_bigquery / convert_type. A dedicated BaseNoSQLToGCSOperator could be a cleaner home for this in the future.

See also

For more information on how to use this operator, take a look at the guide: MongoToGCSOperator

Parameters:
  • mongo_conn_id (str) – Reference to a specific Mongo connection.

  • mongo_db (str) – The MongoDB database name.

  • mongo_collection (str) – The MongoDB collection name.

  • mongo_query (dict | list | None) – A MongoDB find filter (dict) or aggregation pipeline (list). Defaults to {} (match all).

  • mongo_projection (dict | list | None) – Optional projection passed to find(). Ignored when mongo_query is an aggregation pipeline. Accepts a dict ({"field": 1}) or list of field names.

  • allow_disk_use (bool) – Whether to pass allowDiskUse=True to aggregate(). Defaults to True.

ui_color = '#a0e08c'[source]
template_fields: collections.abc.Sequence[str] = ('bucket', 'filename', 'schema_filename', 'schema', 'parameters', 'impersonation_chain',...[source]
template_ext: collections.abc.Sequence[str] = ()[source]
template_fields_renderers[source]
type_map: dict[type, str][source]
mongo_conn_id = 'mongo_default'[source]
mongo_db[source]
mongo_collection[source]
mongo_query[source]
mongo_projection = None[source]
allow_disk_use = True[source]
property db_hook: airflow.providers.mongo.hooks.mongo.MongoHook[source]
query()[source]

Execute the configured find/aggregate and return a DB-API style cursor.

field_to_bigquery(field)[source]

Convert a DBAPI field to BigQuery schema format.

convert_type(value, schema_type, **kwargs)[source]

Convert pymongo values to BigQuery-friendly types.

  • ObjectId -> str.

  • Decimal128 / Decimal -> float.

  • bytes -> base64-encoded str (or int when schema_type == 'INTEGER').

  • datetime -> str(value).

  • date -> ISO date string when schema_type == 'DATE', otherwise combined datetime string.

  • list / dict / tuple -> JSON string.

Was this entry helpful?