Google Cloud VertexAI Operators

The Google Cloud VertexAI brings AutoML and AI Platform together into a unified API, client library, and user interface. AutoML lets you train models on image, tabular, text, and video datasets without writing code, while training in AI Platform lets you run custom training code. With Vertex AI, both AutoML training and custom training are available options. Whichever option you choose for training, you can save models, deploy models, and request predictions with Vertex AI.

Creating Datasets

To create a Google VertexAI dataset you can use CreateDatasetOperator. The operator returns dataset id in XCom under dataset_id key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[source]

create_image_dataset_job = CreateDatasetOperator(
    task_id="image_dataset",
    dataset=IMAGE_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
    task_id="tabular_dataset",
    dataset=TABULAR_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
    task_id="text_dataset",
    dataset=TEXT_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
    task_id="video_dataset",
    dataset=VIDEO_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
    task_id="time_series_dataset",
    dataset=TIME_SERIES_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)

After creating a dataset you can use it to import some data using ImportDataOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[source]

import_data_job = ImportDataOperator(
    task_id="import_data",
    dataset_id=create_image_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
    import_configs=TEST_IMPORT_CONFIG,
)

To export dataset you can use ExportDataOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[source]

export_data_job = ExportDataOperator(
    task_id="export_data",
    dataset_id=create_image_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
    export_config=TEST_EXPORT_CONFIG,
)

To delete dataset you can use DeleteDatasetOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[source]

delete_dataset_job = DeleteDatasetOperator(
    task_id="delete_dataset",
    dataset_id=create_text_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

To get dataset you can use GetDatasetOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[source]

get_dataset = GetDatasetOperator(
    task_id="get_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_tabular_dataset_job.output["dataset_id"],
)

To get a dataset list you can use ListDatasetsOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[source]

list_dataset_job = ListDatasetsOperator(
    task_id="list_dataset",
    region=REGION,
    project_id=PROJECT_ID,
)

To update dataset you can use UpdateDatasetOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[source]

update_dataset_job = UpdateDatasetOperator(
    task_id="update_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_video_dataset_job.output["dataset_id"],
    dataset=DATASET_TO_UPDATE,
    update_mask=TEST_UPDATE_MASK,
)

Creating a Training Jobs

To create a Google Vertex AI training jobs you have three operators CreateCustomContainerTrainingJobOperator, CreateCustomPythonPackageTrainingJobOperator, CreateCustomTrainingJobOperator. Each of them will wait for the operation to complete. The results of each operator will be a model which was trained by user using these operators.

Preparation step

For each operator you must prepare and create dataset. Then put dataset id to dataset_id parameter in operator.

How to run a Custom Container Training Job CreateCustomContainerTrainingJobOperator

Before start running this Job you should create a docker image with training script inside. Documentation how to create image you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-custom-container After that you should put link to the image in container_uri parameter. Also you can type executing command for container which will be created from this image in command parameter.

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_container.py[source]

create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task",
    staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
    display_name=CONTAINER_DISPLAY_NAME,
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    command=["python3", "task.py"],
    model_display_name=MODEL_DISPLAY_NAME,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

The CreateCustomContainerTrainingJobOperator also provides the deferrable mode:

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_container.py[source]

create_custom_container_training_job_deferrable = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
    display_name=f"{CONTAINER_DISPLAY_NAME}-def",
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    command=["python3", "task.py"],
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

How to run a Python Package Training Job CreateCustomPythonPackageTrainingJobOperator

Before start running this Job you should create a python package with training script inside. Documentation how to create you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container Next you should put link to the package in python_package_gcs_uri parameter, also python_module_name parameter should has the name of script which will run your training task.

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py[source]

create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task",
    staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
    display_name=PACKAGE_DISPLAY_NAME,
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

The CreateCustomPythonPackageTrainingJobOperator also provides the deferrable mode:

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py[source]

create_custom_python_package_training_job_deferrable = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
    display_name=f"{PACKAGE_DISPLAY_NAME}-def",
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

How to run a Custom Training Job CreateCustomTrainingJobOperator.

To create and run a Custom Training Job you should put the path to your local training script inside the script_path parameter.

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

create_custom_training_job = CreateCustomTrainingJobOperator(
    task_id="custom_task",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=CUSTOM_DISPLAY_NAME,
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

model_id_v1 = create_custom_training_job.output["model_id"]

The same operation can be performed in the deferrable mode:

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

create_custom_training_job_deferrable = CreateCustomTrainingJobOperator(
    task_id="custom_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=f"{CUSTOM_DISPLAY_NAME}-def",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)
model_id_deferrable_v1 = create_custom_training_job_deferrable.output["model_id"]

Additionally, you can create a new version of an existing Custom Training Job. It will replace the existing Model with another version, instead of creating a new Model in the Model Registry. This can be done by specifying the parent_model parameter when running a Custom Training Job.

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

create_custom_training_job_v2 = CreateCustomTrainingJobOperator(
    task_id="custom_task_v2",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=CUSTOM_DISPLAY_NAME,
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    parent_model=model_id_v1,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

The same operation can be performed in the deferrable mode:

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

create_custom_training_job_deferrable_v2 = CreateCustomTrainingJobOperator(
    task_id="custom_task_deferrable_v2",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=f"{CUSTOM_DISPLAY_NAME}-def",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    parent_model=model_id_deferrable_v1,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

You can get a list of Training Jobs using ListCustomTrainingJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py[source]

list_custom_training_job = ListCustomTrainingJobOperator(
    task_id="list_custom_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

If you wish to delete a Custom Training Job you can use DeleteCustomTrainingJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

delete_custom_training_job = DeleteCustomTrainingJobOperator(
    task_id="delete_custom_training_job",
    training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='training_id') }}",
    custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='custom_job_id') }}",
    region=REGION,
    project_id=PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Creating an AutoML Training Jobs

To create a Google Vertex AI Auto ML training jobs you have five operators CreateAutoMLForecastingTrainingJobOperator CreateAutoMLImageTrainingJobOperator CreateAutoMLTabularTrainingJobOperator SupervisedFineTuningTrainOperator CreateAutoMLVideoTrainingJobOperator Each of them will wait for the operation to complete. The results of each operator will be a model which was trained by user using these operators.

How to run AutoML Forecasting Training Job CreateAutoMLForecastingTrainingJobOperator

Before start running this Job you must prepare and create TimeSeries dataset. After that you should put dataset id to dataset_id parameter in operator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py[source]

create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
    task_id="auto_ml_forecasting_task",
    display_name=FORECASTING_DISPLAY_NAME,
    optimization_objective="minimize-rmse",
    column_specs=COLUMN_SPECS,
    # run params
    dataset_id=forecast_dataset_id,
    target_column=TEST_TARGET_COLUMN,
    time_column=TEST_TIME_COLUMN,
    time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
    available_at_forecast_columns=[TEST_TIME_COLUMN],
    unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
    time_series_attribute_columns=["city", "zip_code", "county"],
    forecast_horizon=30,
    context_window=30,
    data_granularity_unit="day",
    data_granularity_count=1,
    weight_column=None,
    budget_milli_node_hours=1000,
    model_display_name=MODEL_DISPLAY_NAME,
    predefined_split_column_name=None,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run AutoML Image Training Job CreateAutoMLImageTrainingJobOperator

Before start running this Job you must prepare and create Image dataset. After that you should put dataset id to dataset_id parameter in operator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py[source]

create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
    task_id="auto_ml_image_task",
    display_name=IMAGE_DISPLAY_NAME,
    dataset_id=image_dataset_id,
    prediction_type="classification",
    multi_label=False,
    model_type="CLOUD",
    training_fraction_split=0.6,
    validation_fraction_split=0.2,
    test_fraction_split=0.2,
    budget_milli_node_hours=8000,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

To run AutoML image detection training job:

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_object_detection.py[source]

create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
    task_id="auto_ml_image_task",
    display_name=IMAGE_DISPLAY_NAME,
    dataset_id=image_dataset_id,
    prediction_type="object_detection",
    multi_label=False,
    model_type="CLOUD",
    training_fraction_split=0.6,
    validation_fraction_split=0.2,
    test_fraction_split=0.2,
    budget_milli_node_hours=20000,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run AutoML Tabular Training Job CreateAutoMLTabularTrainingJobOperator

Before start running this Job you must prepare and create Tabular dataset. After that you should put dataset id to dataset_id parameter in operator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py[source]

create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
    task_id="auto_ml_tabular_task",
    display_name=TABULAR_DISPLAY_NAME,
    optimization_prediction_type="classification",
    column_transformations=COLUMN_TRANSFORMATIONS,
    dataset_id=tabular_dataset_id,
    target_column="Adopted",
    training_fraction_split=0.8,
    validation_fraction_split=0.1,
    test_fraction_split=0.1,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run AutoML Video Training Job CreateAutoMLVideoTrainingJobOperator

Before start running this Job you must prepare and create Video dataset. After that you should put dataset id to dataset_id parameter in operator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py[source]

create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)
model_id_v1 = create_auto_ml_video_training_job.output["model_id"]

Additionally, you can create new version of existing AutoML Video Training Job. In this case, the result will be new version of existing Model instead of new Model created in Model Registry. This can be done by specifying parent_model parameter when running AutoML Video Training Job.

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py[source]

create_auto_ml_video_training_job_v2 = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_v2_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    parent_model=model_id_v1,
    region=REGION,
    project_id=PROJECT_ID,
)

Also you can use vertex_ai AutoML model for video tracking.

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py[source]

create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="object_tracking",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

You can get a list of AutoML Training Jobs using ListAutoMLTrainingJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py[source]

list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
    task_id="list_auto_ml_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

If you wish to delete a Auto ML Training Job you can use DeleteAutoMLTrainingJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py[source]

delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
    task_id="delete_auto_ml_forecasting_training_job",
    training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_forecasting_task', "
    "key='training_id') }}",
    region=REGION,
    project_id=PROJECT_ID,
)

Creating a Batch Prediction Jobs

To create a Google VertexAI Batch Prediction Job you can use CreateBatchPredictionJobOperator. The operator returns batch prediction job id in XCom under batch_prediction_job_id key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[source]

create_batch_prediction_job = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job",
    job_display_name=JOB_DISPLAY_NAME,
    model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
)

The CreateBatchPredictionJobOperator also provides deferrable mode:

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[source]

create_batch_prediction_job_def = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job_def",
    job_display_name=JOB_DISPLAY_NAME,
    model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

To delete batch prediction job you can use DeleteBatchPredictionJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[source]

delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
    task_id="delete_batch_prediction_job",
    batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
    region=REGION,
    project_id=PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

To get a batch prediction job list you can use ListBatchPredictionJobsOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[source]

list_batch_prediction_job = ListBatchPredictionJobsOperator(
    task_id="list_batch_prediction_jobs",
    region=REGION,
    project_id=PROJECT_ID,
)

Creating an Endpoint Service

To create a Google VertexAI endpoint you can use CreateEndpointOperator. The operator returns endpoint id in XCom under endpoint_id key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

create_endpoint = CreateEndpointOperator(
    task_id="create_endpoint",
    endpoint=ENDPOINT_CONF,
    region=REGION,
    project_id=PROJECT_ID,
)

After creating an endpoint you can use it to deploy some model using DeployModelOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

deploy_model = DeployModelOperator(
    task_id="deploy_model",
    endpoint_id=create_endpoint.output["endpoint_id"],
    deployed_model=DEPLOYED_MODEL,
    traffic_split={"0": 100},
    region=REGION,
    project_id=PROJECT_ID,
)

To un deploy model you can use UndeployModelOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

undeploy_model = UndeployModelOperator(
    task_id="undeploy_model",
    endpoint_id=create_endpoint.output["endpoint_id"],
    deployed_model_id=deploy_model.output["deployed_model_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

To delete endpoint you can use DeleteEndpointOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

delete_endpoint = DeleteEndpointOperator(
    task_id="delete_endpoint",
    endpoint_id=create_endpoint.output["endpoint_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

To get an endpoint list you can use ListEndpointsOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

list_endpoints = ListEndpointsOperator(
    task_id="list_endpoints",
    region=REGION,
    project_id=PROJECT_ID,
)

Creating a Hyperparameter Tuning Jobs

To create a Google VertexAI hyperparameter tuning job you can use CreateHyperparameterTuningJobOperator. The operator returns hyperparameter tuning job id in XCom under hyperparameter_tuning_job_id key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job",
    staging_bucket=STAGING_BUCKET,
    display_name=DISPLAY_NAME,
    worker_pool_specs=WORKER_POOL_SPECS,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec=PARAM_SPECS,
    metric_spec=METRIC_SPEC,
    max_trial_count=15,
    parallel_trial_count=3,
)

CreateHyperparameterTuningJobOperator also supports deferrable mode:

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

create_hyperparameter_tuning_job_def = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job_def",
    staging_bucket=STAGING_BUCKET,
    display_name=DISPLAY_NAME,
    worker_pool_specs=WORKER_POOL_SPECS,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec=PARAM_SPECS,
    metric_spec=METRIC_SPEC,
    max_trial_count=15,
    parallel_trial_count=3,
    deferrable=True,
)

To delete hyperparameter tuning job you can use DeleteHyperparameterTuningJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
    task_id="delete_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
    "task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

To get hyperparameter tuning job you can use GetHyperparameterTuningJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
    task_id="get_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
    "task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
)

To get a hyperparameter tuning job list you can use ListHyperparameterTuningJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
    task_id="list_hyperparameter_tuning_job",
    region=REGION,
    project_id=PROJECT_ID,
)

Creating a Model Service

To upload a Google VertexAI model you can use UploadModelOperator. The operator returns model id in XCom under model_id key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

upload_model = UploadModelOperator(
    task_id="upload_model",
    region=REGION,
    project_id=PROJECT_ID,
    model=MODEL_OBJ,
)
upload_model_v1 = upload_model.output["model_id"]

To export model you can use ExportModelOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

export_model = ExportModelOperator(
    task_id="export_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    output_config=MODEL_OUTPUT_CONFIG,
)

To delete model you can use DeleteModelOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

delete_model = DeleteModelOperator(
    task_id="delete_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    trigger_rule=TriggerRule.ALL_DONE,
)

To get a model list you can use ListModelsOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

list_models = ListModelsOperator(
    task_id="list_models",
    region=REGION,
    project_id=PROJECT_ID,
)

To retrieve model by its ID you can use GetModelOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

get_model = GetModelOperator(
    task_id="get_model", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)

To list all model versions you can use ListModelVersionsOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

list_model_versions = ListModelVersionsOperator(
    task_id="list_model_versions", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)

To set a specific version of model as a default one you can use SetDefaultVersionOnModelOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

set_default_version = SetDefaultVersionOnModelOperator(
    task_id="set_default_version",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=model_id_v2,
)

To add aliases to specific version of model you can use AddVersionAliasesOnModelOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

add_version_alias = AddVersionAliasesOnModelOperator(
    task_id="add_version_alias",
    project_id=PROJECT_ID,
    region=REGION,
    version_aliases=["new-version", "beta"],
    model_id=model_id_v2,
)

To delete aliases from specific version of model you can use DeleteVersionAliasesOnModelOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

delete_version_alias = DeleteVersionAliasesOnModelOperator(
    task_id="delete_version_alias",
    project_id=PROJECT_ID,
    region=REGION,
    version_aliases=["new-version"],
    model_id=model_id_v2,
)

To delete specific version of model you can use DeleteModelVersionOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[source]

delete_model_version = DeleteModelVersionOperator(
    task_id="delete_model_version",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=model_id_v1,
    trigger_rule=TriggerRule.ALL_DONE,
)

Running a Pipeline Jobs

To run a Google VertexAI Pipeline Job you can use RunPipelineJobOperator. The operator returns pipeline job id in XCom under pipeline_job_id key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[source]

run_pipeline_job = RunPipelineJobOperator(
    task_id="run_pipeline_job",
    display_name=DISPLAY_NAME,
    template_path=TEMPLATE_PATH,
    parameter_values=PARAMETER_VALUES,
    region=REGION,
    project_id=PROJECT_ID,
)

To delete pipeline job you can use DeletePipelineJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[source]

delete_pipeline_job = DeletePipelineJobOperator(
    task_id="delete_pipeline_job",
    project_id=PROJECT_ID,
    region=REGION,
    pipeline_job_id="{{ task_instance.xcom_pull("
    "task_ids='run_pipeline_job', key='pipeline_job_id') }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

To get pipeline job you can use GetPipelineJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[source]

get_pipeline_job = GetPipelineJobOperator(
    task_id="get_pipeline_job",
    project_id=PROJECT_ID,
    region=REGION,
    pipeline_job_id="{{ task_instance.xcom_pull("
    "task_ids='run_pipeline_job', key='pipeline_job_id') }}",
)

To get a pipeline job list you can use ListPipelineJobOperator.

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[source]

list_pipeline_job = ListPipelineJobOperator(
    task_id="list_pipeline_job",
    region=REGION,
    project_id=PROJECT_ID,
)

Interacting with Generative AI

To generate text embeddings you can use TextEmbeddingModelGetEmbeddingsOperator. The operator returns the model’s response in XCom under model_response key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[source]

generate_embeddings_task = TextEmbeddingModelGetEmbeddingsOperator(
    task_id="generate_embeddings_task",
    project_id=PROJECT_ID,
    location=REGION,
    prompt=PROMPT,
    pretrained_model=TEXT_EMBEDDING_MODEL,
)

To generate content with a generative model you can use GenerativeModelGenerateContentOperator. The operator returns the model’s response in XCom under model_response key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[source]

generate_content_task = GenerativeModelGenerateContentOperator(
    task_id="generate_content_task",
    project_id=PROJECT_ID,
    contents=CONTENTS,
    tools=TOOLS,
    location=REGION,
    generation_config=GENERATION_CONFIG,
    safety_settings=SAFETY_SETTINGS,
    pretrained_model=MULTIMODAL_MODEL,
)

To run a supervised fine tuning job you can use SupervisedFineTuningTrainOperator. The operator returns the tuned model’s endpoint name in XCom under tuned_model_endpoint_name key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py[source]

sft_train_task = SupervisedFineTuningTrainOperator(
    task_id="sft_train_task",
    project_id=PROJECT_ID,
    location=REGION,
    source_model=SOURCE_MODEL,
    train_dataset=TRAIN_DATASET,
    tuned_model_display_name=TUNED_MODEL_DISPLAY_NAME,
)

To calculates the number of input tokens before sending a request to the Gemini API you can use: CountTokensOperator. The operator returns the total tokens in XCom under total_tokens key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[source]

count_tokens_task = CountTokensOperator(
    task_id="count_tokens_task",
    project_id=PROJECT_ID,
    contents=CONTENTS,
    location=REGION,
    pretrained_model=MULTIMODAL_MODEL,
)

To evaluate a model you can use RunEvaluationOperator. The operator returns the evaluation summary metrics in XCom under summary_metrics key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[source]

run_evaluation_task = RunEvaluationOperator(
    task_id="run_evaluation_task",
    project_id=PROJECT_ID,
    location=REGION,
    pretrained_model=MULTIMODAL_MODEL,
    eval_dataset=EVAL_DATASET,
    metrics=METRICS,
    experiment_name=EXPERIMENT_NAME,
    experiment_run_name=EXPERIMENT_RUN_NAME,
    prompt_template=PROMPT_TEMPLATE,
)

To create cached content you can use CreateCachedContentOperator. The operator returns the cached content resource name in XCom under return_value key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[source]

create_cached_content_task = CreateCachedContentOperator(
    task_id="create_cached_content_task",
    project_id=PROJECT_ID,
    location=REGION,
    model_name=CACHED_MODEL,
    system_instruction=CACHED_SYSTEM_INSTRUCTION,
    contents=CACHED_CONTENTS,
    ttl_hours=1,
    display_name="example-cache",
)

To generate a response from cached content you can use GenerateFromCachedContentOperator. The operator returns the cached content response in XCom under return_value key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[source]

generate_from_cached_content_task = GenerateFromCachedContentOperator(
    task_id="generate_from_cached_content_task",
    project_id=PROJECT_ID,
    location=REGION,
    cached_content_name="{{ task_instance.xcom_pull(task_ids='create_cached_content_task', key='return_value') }}",
    contents=["What are the papers about?"],
    generation_config=GENERATION_CONFIG,
    safety_settings=SAFETY_SETTINGS,
)

Interacting with Vertex AI Feature Store

To get a feature view sync job you can use GetFeatureViewSyncOperator. The operator returns sync job results in XCom under return_value key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py[source]

get_task = GetFeatureViewSyncOperator(
    task_id="get_task",
    location=REGION,
    feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
)

To sync a feature view you can use SyncFeatureViewOperator. The operator returns the sync job name in XCom under return_value key.

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py[source]

sync_task = SyncFeatureViewOperator(
    task_id="sync_task",
    project_id=PROJECT_ID,
    location=REGION,
    feature_online_store_id=FEATURE_ONLINE_STORE_ID,
    feature_view_id=FEATURE_VIEW_ID,
)

To check if Feature View Sync succeeded you can use FeatureViewSyncSensor.

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py[source]

wait_for_sync = FeatureViewSyncSensor(
    task_id="wait_for_sync",
    location=REGION,
    feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
    poke_interval=60,  # Check every minute
    timeout=600,  # Timeout after 10 minutes
    mode="reschedule",
)

Reference

For further information, look at:

Was this entry helpful?