Google Cloud VertexAI Operators¶
The Google Cloud VertexAI brings AutoML and AI Platform together into a unified API, client library, and user interface. AutoML lets you train models on image, tabular, text, and video datasets without writing code, while training in AI Platform lets you run custom training code. With Vertex AI, both AutoML training and custom training are available options. Whichever option you choose for training, you can save models, deploy models, and request predictions with Vertex AI.
Creating Datasets¶
To create a Google VertexAI dataset you can use
CreateDatasetOperator
.
The operator returns dataset id in XCom under dataset_id
key.
create_image_dataset_job = CreateDatasetOperator(
task_id="image_dataset",
dataset=IMAGE_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
task_id="tabular_dataset",
dataset=TABULAR_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
task_id="text_dataset",
dataset=TEXT_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
task_id="video_dataset",
dataset=VIDEO_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
task_id="time_series_dataset",
dataset=TIME_SERIES_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
After creating a dataset you can use it to import some data using
ImportDataOperator
.
import_data_job = ImportDataOperator(
task_id="import_data",
dataset_id=create_image_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
import_configs=TEST_IMPORT_CONFIG,
)
To export dataset you can use
ExportDataOperator
.
export_data_job = ExportDataOperator(
task_id="export_data",
dataset_id=create_image_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
export_config=TEST_EXPORT_CONFIG,
)
To delete dataset you can use
DeleteDatasetOperator
.
delete_dataset_job = DeleteDatasetOperator(
task_id="delete_dataset",
dataset_id=create_text_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
)
To get dataset you can use
GetDatasetOperator
.
get_dataset = GetDatasetOperator(
task_id="get_dataset",
project_id=PROJECT_ID,
region=REGION,
dataset_id=create_tabular_dataset_job.output["dataset_id"],
)
To get a dataset list you can use
ListDatasetsOperator
.
list_dataset_job = ListDatasetsOperator(
task_id="list_dataset",
region=REGION,
project_id=PROJECT_ID,
)
To update dataset you can use
UpdateDatasetOperator
.
update_dataset_job = UpdateDatasetOperator(
task_id="update_dataset",
project_id=PROJECT_ID,
region=REGION,
dataset_id=create_video_dataset_job.output["dataset_id"],
dataset=DATASET_TO_UPDATE,
update_mask=TEST_UPDATE_MASK,
)
Creating a Training Jobs¶
To create a Google Vertex AI training jobs you have three operators
CreateCustomContainerTrainingJobOperator
,
CreateCustomPythonPackageTrainingJobOperator
,
CreateCustomTrainingJobOperator
.
Each of them will wait for the operation to complete. The results of each operator will be a model
which was trained by user using these operators.
Preparation step
For each operator you must prepare and create dataset. Then put dataset id to dataset_id
parameter in operator.
How to run a Custom Container Training Job
CreateCustomContainerTrainingJobOperator
Before start running this Job you should create a docker image with training script inside. Documentation how to
create image you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-custom-container
After that you should put link to the image in container_uri
parameter. Also you can type executing command
for container which will be created from this image in command
parameter.
create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
task_id="custom_container_task",
staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
display_name=CONTAINER_DISPLAY_NAME,
container_uri=CUSTOM_CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
command=["python3", "task.py"],
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
The CreateCustomContainerTrainingJobOperator
also provides the deferrable mode:
create_custom_container_training_job_deferrable = CreateCustomContainerTrainingJobOperator(
task_id="custom_container_task_deferrable",
staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
display_name=f"{CONTAINER_DISPLAY_NAME}-def",
container_uri=CUSTOM_CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
command=["python3", "task.py"],
model_display_name=f"{MODEL_DISPLAY_NAME}-def",
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
How to run a Python Package Training Job
CreateCustomPythonPackageTrainingJobOperator
Before start running this Job you should create a python package with training script inside. Documentation how to
create you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container
Next you should put link to the package in python_package_gcs_uri
parameter, also python_module_name
parameter should has the name of script which will run your training task.
create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
task_id="python_package_task",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=PACKAGE_DISPLAY_NAME,
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
The CreateCustomPythonPackageTrainingJobOperator
also provides the deferrable mode:
create_custom_python_package_training_job_deferrable = CreateCustomPythonPackageTrainingJobOperator(
task_id="python_package_task_deferrable",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=f"{PACKAGE_DISPLAY_NAME}-def",
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
model_display_name=f"{MODEL_DISPLAY_NAME}-def",
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
How to run a Custom Training Job
CreateCustomTrainingJobOperator
.
To create and run a Custom Training Job you should put the path to your local training script inside the script_path
parameter.
create_custom_training_job = CreateCustomTrainingJobOperator(
task_id="custom_task",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=CUSTOM_DISPLAY_NAME,
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
model_id_v1 = create_custom_training_job.output["model_id"]
The same operation can be performed in the deferrable mode:
create_custom_training_job_deferrable = CreateCustomTrainingJobOperator(
task_id="custom_task_deferrable",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=f"{CUSTOM_DISPLAY_NAME}-def",
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=f"{MODEL_DISPLAY_NAME}-def",
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
model_id_deferrable_v1 = create_custom_training_job_deferrable.output["model_id"]
Additionally, you can create a new version of an existing Custom Training Job. It will replace the existing
Model with another version, instead of creating a new Model in the Model Registry.
This can be done by specifying the parent_model
parameter when running a Custom Training Job.
create_custom_training_job_v2 = CreateCustomTrainingJobOperator(
task_id="custom_task_v2",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=CUSTOM_DISPLAY_NAME,
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
parent_model=model_id_v1,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
The same operation can be performed in the deferrable mode:
create_custom_training_job_deferrable_v2 = CreateCustomTrainingJobOperator(
task_id="custom_task_deferrable_v2",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=f"{CUSTOM_DISPLAY_NAME}-def",
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
parent_model=model_id_deferrable_v1,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=f"{MODEL_DISPLAY_NAME}-def",
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
You can get a list of Training Jobs using
ListCustomTrainingJobOperator
.
list_custom_training_job = ListCustomTrainingJobOperator(
task_id="list_custom_training_job",
region=REGION,
project_id=PROJECT_ID,
)
If you wish to delete a Custom Training Job you can use
DeleteCustomTrainingJobOperator
.
delete_custom_training_job = DeleteCustomTrainingJobOperator(
task_id="delete_custom_training_job",
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='training_id') }}",
custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='custom_job_id') }}",
region=REGION,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
Creating an AutoML Training Jobs¶
To create a Google Vertex AI Auto ML training jobs you have five operators
CreateAutoMLForecastingTrainingJobOperator
CreateAutoMLImageTrainingJobOperator
CreateAutoMLTabularTrainingJobOperator
SupervisedFineTuningTrainOperator
CreateAutoMLVideoTrainingJobOperator
Each of them will wait for the operation to complete. The results of each operator will be a model
which was trained by user using these operators.
How to run AutoML Forecasting Training Job
CreateAutoMLForecastingTrainingJobOperator
Before start running this Job you must prepare and create TimeSeries
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
task_id="auto_ml_forecasting_task",
display_name=FORECASTING_DISPLAY_NAME,
optimization_objective="minimize-rmse",
column_specs=COLUMN_SPECS,
# run params
dataset_id=forecast_dataset_id,
target_column=TEST_TARGET_COLUMN,
time_column=TEST_TIME_COLUMN,
time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
available_at_forecast_columns=[TEST_TIME_COLUMN],
unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
time_series_attribute_columns=["city", "zip_code", "county"],
forecast_horizon=30,
context_window=30,
data_granularity_unit="day",
data_granularity_count=1,
weight_column=None,
budget_milli_node_hours=1000,
model_display_name=MODEL_DISPLAY_NAME,
predefined_split_column_name=None,
region=REGION,
project_id=PROJECT_ID,
)
How to run AutoML Image Training Job
CreateAutoMLImageTrainingJobOperator
Before start running this Job you must prepare and create Image
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
task_id="auto_ml_image_task",
display_name=IMAGE_DISPLAY_NAME,
dataset_id=image_dataset_id,
prediction_type="classification",
multi_label=False,
model_type="CLOUD",
training_fraction_split=0.6,
validation_fraction_split=0.2,
test_fraction_split=0.2,
budget_milli_node_hours=8000,
model_display_name=MODEL_DISPLAY_NAME,
disable_early_stopping=False,
region=REGION,
project_id=PROJECT_ID,
)
To run AutoML image detection training job:
create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
task_id="auto_ml_image_task",
display_name=IMAGE_DISPLAY_NAME,
dataset_id=image_dataset_id,
prediction_type="object_detection",
multi_label=False,
model_type="CLOUD",
training_fraction_split=0.6,
validation_fraction_split=0.2,
test_fraction_split=0.2,
budget_milli_node_hours=20000,
model_display_name=MODEL_DISPLAY_NAME,
disable_early_stopping=False,
region=REGION,
project_id=PROJECT_ID,
)
How to run AutoML Tabular Training Job
CreateAutoMLTabularTrainingJobOperator
Before start running this Job you must prepare and create Tabular
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
task_id="auto_ml_tabular_task",
display_name=TABULAR_DISPLAY_NAME,
optimization_prediction_type="classification",
column_transformations=COLUMN_TRANSFORMATIONS,
dataset_id=tabular_dataset_id,
target_column="Adopted",
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
model_display_name=MODEL_DISPLAY_NAME,
disable_early_stopping=False,
region=REGION,
project_id=PROJECT_ID,
)
How to run AutoML Video Training Job
CreateAutoMLVideoTrainingJobOperator
Before start running this Job you must prepare and create Video
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
task_id="auto_ml_video_task",
display_name=VIDEO_DISPLAY_NAME,
prediction_type="classification",
model_type="CLOUD",
dataset_id=video_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
model_id_v1 = create_auto_ml_video_training_job.output["model_id"]
Additionally, you can create new version of existing AutoML Video Training Job. In this case, the result will be new
version of existing Model instead of new Model created in Model Registry. This can be done by specifying
parent_model
parameter when running AutoML Video Training Job.
create_auto_ml_video_training_job_v2 = CreateAutoMLVideoTrainingJobOperator(
task_id="auto_ml_video_v2_task",
display_name=VIDEO_DISPLAY_NAME,
prediction_type="classification",
model_type="CLOUD",
dataset_id=video_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
parent_model=model_id_v1,
region=REGION,
project_id=PROJECT_ID,
)
Also you can use vertex_ai AutoML model for video tracking.
create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
task_id="auto_ml_video_task",
display_name=VIDEO_DISPLAY_NAME,
prediction_type="object_tracking",
model_type="CLOUD",
dataset_id=video_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
You can get a list of AutoML Training Jobs using
ListAutoMLTrainingJobOperator
.
list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
task_id="list_auto_ml_training_job",
region=REGION,
project_id=PROJECT_ID,
)
If you wish to delete a Auto ML Training Job you can use
DeleteAutoMLTrainingJobOperator
.
delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
task_id="delete_auto_ml_forecasting_training_job",
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_forecasting_task', "
"key='training_id') }}",
region=REGION,
project_id=PROJECT_ID,
)
Creating a Batch Prediction Jobs¶
To create a Google VertexAI Batch Prediction Job you can use
CreateBatchPredictionJobOperator
.
The operator returns batch prediction job id in XCom under batch_prediction_job_id
key.
create_batch_prediction_job = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job",
job_display_name=JOB_DISPLAY_NAME,
model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
predictions_format="csv",
bigquery_source=BIGQUERY_SOURCE,
gcs_destination_prefix=GCS_DESTINATION_PREFIX,
model_parameters=MODEL_PARAMETERS,
region=REGION,
project_id=PROJECT_ID,
)
The CreateBatchPredictionJobOperator
also provides deferrable mode:
create_batch_prediction_job_def = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job_def",
job_display_name=JOB_DISPLAY_NAME,
model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
predictions_format="csv",
bigquery_source=BIGQUERY_SOURCE,
gcs_destination_prefix=GCS_DESTINATION_PREFIX,
model_parameters=MODEL_PARAMETERS,
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
To delete batch prediction job you can use
DeleteBatchPredictionJobOperator
.
delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
task_id="delete_batch_prediction_job",
batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
region=REGION,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
To get a batch prediction job list you can use
ListBatchPredictionJobsOperator
.
list_batch_prediction_job = ListBatchPredictionJobsOperator(
task_id="list_batch_prediction_jobs",
region=REGION,
project_id=PROJECT_ID,
)
Creating an Endpoint Service¶
To create a Google VertexAI endpoint you can use
CreateEndpointOperator
.
The operator returns endpoint id in XCom under endpoint_id
key.
create_endpoint = CreateEndpointOperator(
task_id="create_endpoint",
endpoint=ENDPOINT_CONF,
region=REGION,
project_id=PROJECT_ID,
)
After creating an endpoint you can use it to deploy some model using
DeployModelOperator
.
deploy_model = DeployModelOperator(
task_id="deploy_model",
endpoint_id=create_endpoint.output["endpoint_id"],
deployed_model=DEPLOYED_MODEL,
traffic_split={"0": 100},
region=REGION,
project_id=PROJECT_ID,
)
To un deploy model you can use
UndeployModelOperator
.
undeploy_model = UndeployModelOperator(
task_id="undeploy_model",
endpoint_id=create_endpoint.output["endpoint_id"],
deployed_model_id=deploy_model.output["deployed_model_id"],
region=REGION,
project_id=PROJECT_ID,
)
To delete endpoint you can use
DeleteEndpointOperator
.
delete_endpoint = DeleteEndpointOperator(
task_id="delete_endpoint",
endpoint_id=create_endpoint.output["endpoint_id"],
region=REGION,
project_id=PROJECT_ID,
)
To get an endpoint list you can use
ListEndpointsOperator
.
list_endpoints = ListEndpointsOperator(
task_id="list_endpoints",
region=REGION,
project_id=PROJECT_ID,
)
Creating a Hyperparameter Tuning Jobs¶
To create a Google VertexAI hyperparameter tuning job you can use
CreateHyperparameterTuningJobOperator
.
The operator returns hyperparameter tuning job id in XCom under hyperparameter_tuning_job_id
key.
create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
task_id="create_hyperparameter_tuning_job",
staging_bucket=STAGING_BUCKET,
display_name=DISPLAY_NAME,
worker_pool_specs=WORKER_POOL_SPECS,
region=REGION,
project_id=PROJECT_ID,
parameter_spec=PARAM_SPECS,
metric_spec=METRIC_SPEC,
max_trial_count=15,
parallel_trial_count=3,
)
CreateHyperparameterTuningJobOperator
also supports deferrable mode:
create_hyperparameter_tuning_job_def = CreateHyperparameterTuningJobOperator(
task_id="create_hyperparameter_tuning_job_def",
staging_bucket=STAGING_BUCKET,
display_name=DISPLAY_NAME,
worker_pool_specs=WORKER_POOL_SPECS,
region=REGION,
project_id=PROJECT_ID,
parameter_spec=PARAM_SPECS,
metric_spec=METRIC_SPEC,
max_trial_count=15,
parallel_trial_count=3,
deferrable=True,
)
To delete hyperparameter tuning job you can use
DeleteHyperparameterTuningJobOperator
.
delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
task_id="delete_hyperparameter_tuning_job",
project_id=PROJECT_ID,
region=REGION,
hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
"task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
trigger_rule=TriggerRule.ALL_DONE,
)
To get hyperparameter tuning job you can use
GetHyperparameterTuningJobOperator
.
get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
task_id="get_hyperparameter_tuning_job",
project_id=PROJECT_ID,
region=REGION,
hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
"task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
)
To get a hyperparameter tuning job list you can use
ListHyperparameterTuningJobOperator
.
list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
task_id="list_hyperparameter_tuning_job",
region=REGION,
project_id=PROJECT_ID,
)
Creating a Model Service¶
To upload a Google VertexAI model you can use
UploadModelOperator
.
The operator returns model id in XCom under model_id
key.
upload_model = UploadModelOperator(
task_id="upload_model",
region=REGION,
project_id=PROJECT_ID,
model=MODEL_OBJ,
)
upload_model_v1 = upload_model.output["model_id"]
To export model you can use
ExportModelOperator
.
export_model = ExportModelOperator(
task_id="export_model",
project_id=PROJECT_ID,
region=REGION,
model_id=upload_model.output["model_id"],
output_config=MODEL_OUTPUT_CONFIG,
)
To delete model you can use
DeleteModelOperator
.
delete_model = DeleteModelOperator(
task_id="delete_model",
project_id=PROJECT_ID,
region=REGION,
model_id=upload_model.output["model_id"],
trigger_rule=TriggerRule.ALL_DONE,
)
To get a model list you can use
ListModelsOperator
.
list_models = ListModelsOperator(
task_id="list_models",
region=REGION,
project_id=PROJECT_ID,
)
To retrieve model by its ID you can use
GetModelOperator
.
get_model = GetModelOperator(
task_id="get_model", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)
To list all model versions you can use
ListModelVersionsOperator
.
list_model_versions = ListModelVersionsOperator(
task_id="list_model_versions", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)
To set a specific version of model as a default one you can use
SetDefaultVersionOnModelOperator
.
set_default_version = SetDefaultVersionOnModelOperator(
task_id="set_default_version",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v2,
)
To add aliases to specific version of model you can use
AddVersionAliasesOnModelOperator
.
add_version_alias = AddVersionAliasesOnModelOperator(
task_id="add_version_alias",
project_id=PROJECT_ID,
region=REGION,
version_aliases=["new-version", "beta"],
model_id=model_id_v2,
)
To delete aliases from specific version of model you can use
DeleteVersionAliasesOnModelOperator
.
delete_version_alias = DeleteVersionAliasesOnModelOperator(
task_id="delete_version_alias",
project_id=PROJECT_ID,
region=REGION,
version_aliases=["new-version"],
model_id=model_id_v2,
)
To delete specific version of model you can use
DeleteModelVersionOperator
.
delete_model_version = DeleteModelVersionOperator(
task_id="delete_model_version",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v1,
trigger_rule=TriggerRule.ALL_DONE,
)
Running a Pipeline Jobs¶
To run a Google VertexAI Pipeline Job you can use
RunPipelineJobOperator
.
The operator returns pipeline job id in XCom under pipeline_job_id
key.
run_pipeline_job = RunPipelineJobOperator(
task_id="run_pipeline_job",
display_name=DISPLAY_NAME,
template_path=TEMPLATE_PATH,
parameter_values=PARAMETER_VALUES,
region=REGION,
project_id=PROJECT_ID,
)
To delete pipeline job you can use
DeletePipelineJobOperator
.
delete_pipeline_job = DeletePipelineJobOperator(
task_id="delete_pipeline_job",
project_id=PROJECT_ID,
region=REGION,
pipeline_job_id="{{ task_instance.xcom_pull("
"task_ids='run_pipeline_job', key='pipeline_job_id') }}",
trigger_rule=TriggerRule.ALL_DONE,
)
To get pipeline job you can use
GetPipelineJobOperator
.
get_pipeline_job = GetPipelineJobOperator(
task_id="get_pipeline_job",
project_id=PROJECT_ID,
region=REGION,
pipeline_job_id="{{ task_instance.xcom_pull("
"task_ids='run_pipeline_job', key='pipeline_job_id') }}",
)
To get a pipeline job list you can use
ListPipelineJobOperator
.
list_pipeline_job = ListPipelineJobOperator(
task_id="list_pipeline_job",
region=REGION,
project_id=PROJECT_ID,
)
Interacting with Generative AI¶
To generate text embeddings you can use
TextEmbeddingModelGetEmbeddingsOperator
.
The operator returns the model’s response in XCom under model_response
key.
generate_embeddings_task = TextEmbeddingModelGetEmbeddingsOperator(
task_id="generate_embeddings_task",
project_id=PROJECT_ID,
location=REGION,
prompt=PROMPT,
pretrained_model=TEXT_EMBEDDING_MODEL,
)
To generate content with a generative model you can use
GenerativeModelGenerateContentOperator
.
The operator returns the model’s response in XCom under model_response
key.
generate_content_task = GenerativeModelGenerateContentOperator(
task_id="generate_content_task",
project_id=PROJECT_ID,
contents=CONTENTS,
tools=TOOLS,
location=REGION,
generation_config=GENERATION_CONFIG,
safety_settings=SAFETY_SETTINGS,
pretrained_model=MULTIMODAL_MODEL,
)
To run a supervised fine tuning job you can use
SupervisedFineTuningTrainOperator
.
The operator returns the tuned model’s endpoint name in XCom under tuned_model_endpoint_name
key.
sft_train_task = SupervisedFineTuningTrainOperator(
task_id="sft_train_task",
project_id=PROJECT_ID,
location=REGION,
source_model=SOURCE_MODEL,
train_dataset=TRAIN_DATASET,
tuned_model_display_name=TUNED_MODEL_DISPLAY_NAME,
)
To calculates the number of input tokens before sending a request to the Gemini API you can use:
CountTokensOperator
.
The operator returns the total tokens in XCom under total_tokens
key.
count_tokens_task = CountTokensOperator(
task_id="count_tokens_task",
project_id=PROJECT_ID,
contents=CONTENTS,
location=REGION,
pretrained_model=MULTIMODAL_MODEL,
)
To evaluate a model you can use
RunEvaluationOperator
.
The operator returns the evaluation summary metrics in XCom under summary_metrics
key.
run_evaluation_task = RunEvaluationOperator(
task_id="run_evaluation_task",
project_id=PROJECT_ID,
location=REGION,
pretrained_model=MULTIMODAL_MODEL,
eval_dataset=EVAL_DATASET,
metrics=METRICS,
experiment_name=EXPERIMENT_NAME,
experiment_run_name=EXPERIMENT_RUN_NAME,
prompt_template=PROMPT_TEMPLATE,
)
To create cached content you can use
CreateCachedContentOperator
.
The operator returns the cached content resource name in XCom under return_value
key.
create_cached_content_task = CreateCachedContentOperator(
task_id="create_cached_content_task",
project_id=PROJECT_ID,
location=REGION,
model_name=CACHED_MODEL,
system_instruction=CACHED_SYSTEM_INSTRUCTION,
contents=CACHED_CONTENTS,
ttl_hours=1,
display_name="example-cache",
)
To generate a response from cached content you can use
GenerateFromCachedContentOperator
.
The operator returns the cached content response in XCom under return_value
key.
generate_from_cached_content_task = GenerateFromCachedContentOperator(
task_id="generate_from_cached_content_task",
project_id=PROJECT_ID,
location=REGION,
cached_content_name="{{ task_instance.xcom_pull(task_ids='create_cached_content_task', key='return_value') }}",
contents=["What are the papers about?"],
generation_config=GENERATION_CONFIG,
safety_settings=SAFETY_SETTINGS,
)
Interacting with Vertex AI Feature Store¶
To get a feature view sync job you can use
GetFeatureViewSyncOperator
.
The operator returns sync job results in XCom under return_value
key.
get_task = GetFeatureViewSyncOperator(
task_id="get_task",
location=REGION,
feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
)
To sync a feature view you can use
SyncFeatureViewOperator
.
The operator returns the sync job name in XCom under return_value
key.
sync_task = SyncFeatureViewOperator(
task_id="sync_task",
project_id=PROJECT_ID,
location=REGION,
feature_online_store_id=FEATURE_ONLINE_STORE_ID,
feature_view_id=FEATURE_VIEW_ID,
)
To check if Feature View Sync succeeded you can use
FeatureViewSyncSensor
.
wait_for_sync = FeatureViewSyncSensor(
task_id="wait_for_sync",
location=REGION,
feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
poke_interval=60, # Check every minute
timeout=600, # Timeout after 10 minutes
mode="reschedule",
)