airflow.providers.google.cloud.transfers.s3_to_gcs

Module Contents

Classes

S3ToGCSOperator

Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage destination path.

class airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator(*, bucket, prefix='', apply_gcs_prefix=False, delimiter='', aws_conn_id='aws_default', verify=None, gcp_conn_id='google_cloud_default', dest_gcs=None, replace=False, gzip=False, google_impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=10, **kwargs)[source]

Bases: airflow.providers.amazon.aws.operators.s3.S3ListOperator

Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage destination path.

See also

For more information on how to use this operator, take a look at the guide: Transfer Data from Amazon S3 to Google Cloud Storage

Parameters
  • bucket – The S3 bucket where to find the objects. (templated)

  • prefix – Prefix string which filters objects whose name begin with such prefix. (templated)

  • apply_gcs_prefix

    (Optional) Whether to replace source objects’ path by given GCS destination path. If apply_gcs_prefix is False (default), then objects from S3 will be copied to GCS bucket into a given GSC path and the source path will be place inside. For example, <s3_bucket><s3_prefix><content> => <gcs_prefix><s3_prefix><content>

    If apply_gcs_prefix is True, then objects from S3 will be copied to GCS bucket into a given GCS path and the source path will be omitted. For example: <s3_bucket><s3_prefix><content> => <gcs_prefix><content>

  • delimiter – the delimiter marks key hierarchy. (templated)

  • aws_conn_id – The source S3 connection

  • verify

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used

      (unless use_ssl is False), but SSL certificates will not be verified.

    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.

      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

  • gcp_conn_id – (Optional) The connection ID used to connect to Google Cloud.

  • dest_gcs – The destination Google Cloud Storage bucket and prefix where you want to store the files. (templated)

  • replace – Whether you want to replace existing destination files or not.

  • gzip – Option to compress file for upload. Parameter ignored in deferrable mode.

  • google_impersonation_chain (str | collections.abc.Sequence[str] | None) – Optional Google service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).

  • deferrable – Run operator in the deferrable mode

  • poll_interval (int) – time in seconds between polling for job completion. The value is considered only when running in deferrable mode. Must be greater than 0.

Example:

s3_to_gcs_op = S3ToGCSOperator(
    task_id="s3_to_gcs_example",
    bucket="my-s3-bucket",
    prefix="data/customers-201804",
    gcp_conn_id="google_cloud_default",
    dest_gcs="gs://my.gcs.bucket/some/customers/",
    replace=False,
    gzip=True,
    dag=my_dag,
)

Note that bucket, prefix, delimiter and dest_gcs are templated, so you can use variables in them if you wish.

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'dest_gcs', 'google_impersonation_chain')[source]
ui_color = '#e09411'[source]
transfer_job_max_files_number = 1000[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

exclude_existing_objects(s3_objects, gcs_hook)[source]

Excludes from the list objects that already exist in GCS bucket.

s3_to_gcs_object(s3_object)[source]

Transform S3 path to GCS path according to the operator’s logic.

If apply_gcs_prefix == True then <s3_prefix><content> => <gcs_prefix><content> If apply_gcs_prefix == False then <s3_prefix><content> => <gcs_prefix><s3_prefix><content>

gcs_to_s3_object(gcs_object)[source]

Transform GCS path to S3 path according to the operator’s logic.

If apply_gcs_prefix == True then <gcs_prefix><content> => <s3_prefix><content> If apply_gcs_prefix == False then <gcs_prefix><s3_prefix><content> => <s3_prefix><content>

transfer_files(s3_objects, gcs_hook, s3_hook)[source]
transfer_files_async(files, gcs_hook, s3_hook)[source]

Submit Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS.

submit_transfer_jobs(files, gcs_hook, s3_hook)[source]
execute_complete(context, event)[source]

Return immediately and relies on trigger to throw a success event. Callback for the trigger.

Relies on trigger to throw an exception, otherwise it assumes execution was successful.

get_transfer_hook()[source]
get_openlineage_facets_on_start()[source]

Was this entry helpful?