Google Cloud Generative AI Operators

The Google Cloud Generative AI Operators ecosystem is anchored by the Gemini family of multimodal models, which provide interfaces for generating and processing diverse inputs like text, images, and audio. By leveraging these foundation models, developers can securely prompt, tune, and ground AI using their own proprietary data. This capability enables the construction of versatile applications, ranging from custom chatbots and code assistants to automated content summarization tools.

Interacting with Generative AI on Vertex AI

The Google Cloud VertexAI extends Vertex AI with powerful foundation models capable of generating text, images, and other modalities. It provides access to Google Gemini family of multimodal models and other pre-trained generative models through a unified API, SDK, and console. Developers can prompt, tune, and ground these models using their own data to build applications such as chat bots, content creation tools, code assistants, and summarization systems. With Vertex AI, you can securely integrate generative capabilities into enterprise workflows, monitor usage, evaluate model quality, and deploy models at scale — all within the same managed ML platform.

To generate text embeddings you can use GenAIGenerateEmbeddingsOperator. The operator returns the model’s response in XCom under model_response key.

tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py[source]

generate_embeddings_task = GenAIGenerateEmbeddingsOperator(
    task_id="generate_embeddings_task",
    project_id=PROJECT_ID,
    location=REGION,
    contents=CONTENTS,
    model=TEXT_EMBEDDING_MODEL,
)

To generate content with a generative model you can use GenAIGenerateContentOperator. The operator returns the model’s response in XCom under model_response key.

tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py[source]

generate_content_task = GenAIGenerateContentOperator(
    task_id="generate_content_task",
    project_id=PROJECT_ID,
    contents=CONTENTS,
    location=REGION,
    generation_config=GENERATION_CONFIG_CREATE_CONTENT,
    model=MULTIMODAL_MODEL,
)

To run a supervised fine tuning job you can use GenAISupervisedFineTuningTrainOperator. The operator returns the tuned model’s endpoint name in XCom under tuned_model_endpoint_name key.

tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py[source]

sft_train_task = GenAISupervisedFineTuningTrainOperator(
    task_id="sft_train_task",
    project_id=PROJECT_ID,
    location=REGION,
    source_model=SOURCE_MODEL,
    training_dataset=TRAIN_DATASET,
    tuning_job_config=TUNING_JOB_CONFIG,
)

You can also use supervised fine tuning job for video tasks (training and tracking):

tests/system/google/cloud/gen_ai/example_gen_ai_generative_model_tuning.py[source]

sft_video_task = GenAISupervisedFineTuningTrainOperator(
    task_id="sft_train_video_task",
    project_id=PROJECT_ID,
    location=REGION,
    source_model=SOURCE_MODEL,
    training_dataset=TRAIN_VIDEO_DATASET,
    tuning_job_config=TUNING_JOB_VIDEO_MODEL_CONFIG,
)

To calculates the number of input tokens before sending a request to the Gemini API you can use: GenAICountTokensOperator. The operator returns the total tokens in XCom under total_tokens key.

tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py[source]

count_tokens_task = GenAICountTokensOperator(
    task_id="count_tokens_task",
    project_id=PROJECT_ID,
    contents=CONTENTS,
    location=REGION,
    model=MULTIMODAL_MODEL,
)

To create cached content you can use GenAICreateCachedContentOperator. The operator returns the cached content resource name in XCom under cached_content key.

tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py[source]

create_cached_content_task = GenAICreateCachedContentOperator(
    task_id="create_cached_content_task",
    project_id=PROJECT_ID,
    location=REGION,
    model=CACHED_MODEL,
    cached_content_config=CACHED_CONTENT_CONFIG,
)

To generate a response from cached content you can use GenAIGenerateContentOperator. The operator returns the cached content response in XCom under model_response key.

tests/system/google/cloud/gen_ai/example_gen_ai_generative_model.py[source]

generate_from_cached_content_task = GenAIGenerateContentOperator(
    task_id="generate_from_cached_content_task",
    project_id=PROJECT_ID,
    location=REGION,
    contents=["What are the papers about?"],
    generation_config={
        "cached_content": create_cached_content_task.output,
    },
    model=CACHED_MODEL,
)

Interacting with Gemini Batch API

The Gemini Batch API is designed to process large volumes of requests asynchronously at 50% of the standard cost. The target turnaround time is 24 hours, but in majority of cases, it is much quicker. Use Batch API for large-scale, non-urgent tasks such as data pre-processing or running evaluations where an immediate response is not required.

Create batch job

To create batch job via Batch API you can use GenAIGeminiCreateBatchJobOperator. The operator returns the job name in XCom under job_name key.

Two option of input source is allowed: inline requests, file.

If you use inline requests take a look at this example:

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

create_batch_job_using_inlined_requests = GenAIGeminiCreateBatchJobOperator(
    task_id="create_batch_job_using_inlined_requests_task",
    project_id=PROJECT_ID,
    location=REGION,
    model="gemini-3-pro-preview",
    gemini_api_key=GEMINI_XCOM_API_KEY,
    create_batch_job_config={
        "display_name": "inlined-requests-batch-job",
    },
    input_source=INLINED_REQUESTS_FOR_BATCH_JOB,
    wait_until_complete=True,
    retrieve_result=True,
)

If you use file take a look at this example:

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

create_batch_job_using_file = GenAIGeminiCreateBatchJobOperator(
    task_id="create_batch_job_using_file_task",
    project_id=PROJECT_ID,
    location=REGION,
    model="gemini-3-pro-preview",
    gemini_api_key=GEMINI_XCOM_API_KEY,
    create_batch_job_config={
        "display_name": "file-upload-batch-job",
    },
    input_source=UPLOADED_FILE_NAME,
    wait_until_complete=True,
    retrieve_result=True,
    results_folder=PATH_TO_SAVE_RESULTS,
)

Get batch job

To get batch job you can use GenAIGeminiGetBatchJobOperator. The operator returns the job name in XCom under job_name key.

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

get_batch_job = GenAIGeminiGetBatchJobOperator(
    task_id="get_batch_job_task",
    project_id=PROJECT_ID,
    location=REGION,
    gemini_api_key=GEMINI_XCOM_API_KEY,
    job_name=BATCH_JOB_WITH_INLINED_REQUESTS_NAME,
)

List batch jobs

To list batch jobs via Batch API you can use GenAIGeminiListBatchJobsOperator. The operator returns the job names in XCom under job_names key.

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

list_batch_jobs = GenAIGeminiListBatchJobsOperator(
    task_id="list_batch_jobs_task",
    project_id=PROJECT_ID,
    location=REGION,
    gemini_api_key=GEMINI_XCOM_API_KEY,
)

Cancel batch job

To cancel batch job via Batch API you can use GenAIGeminiCancelBatchJobOperator.

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

cancel_batch_job = GenAIGeminiCancelBatchJobOperator(
    task_id="cancel_batch_job_task",
    project_id=PROJECT_ID,
    location=REGION,
    gemini_api_key=GEMINI_XCOM_API_KEY,
    job_name=BATCH_JOB_WITH_FILE_NAME,
)

Delete batch job

To queue batch job for deletion via Batch API you can use GenAIGeminiDeleteBatchJobOperator. The job will not be deleted immediately. After submitting it for deletion, it will still be available through GenAIGeminiListBatchJobsOperator or GenAIGeminiGetBatchJobOperator for some time. It is behavior of the API not the operator.

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

delete_batch_job_1 = GenAIGeminiDeleteBatchJobOperator(
    task_id="delete_batch_job_1_task",
    project_id=PROJECT_ID,
    location=REGION,
    gemini_api_key=GEMINI_XCOM_API_KEY,
    job_name=BATCH_JOB_WITH_INLINED_REQUESTS_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

Create embeddings

To create embeddings batch job via Batch API you can use GenAIGeminiCreateEmbeddingsBatchJobOperator. The operator returns the job name in XCom under job_name key.

Two option of input source is allowed: inline requests, file.

If you use inline requests take a look at this example:

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

create_embeddings_job_using_inlined_requests = GenAIGeminiCreateEmbeddingsBatchJobOperator(
    task_id="create_embeddings_job_using_inlined_requests_task",
    project_id=PROJECT_ID,
    location=REGION,
    model="gemini-embedding-001",
    wait_until_complete=False,
    gemini_api_key=GEMINI_XCOM_API_KEY,
    create_embeddings_config={
        "display_name": "inlined-requests-embeddings-job",
    },
    input_source=INLINED_REQUESTS_FOR_EMBEDDINGS_BATCH_JOB,
)

If you use file take a look at this example:

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

create_embeddings_job_using_file = GenAIGeminiCreateEmbeddingsBatchJobOperator(
    task_id="create_embeddings_job_using_file_task",
    project_id=PROJECT_ID,
    location=REGION,
    model="gemini-embedding-001",
    wait_until_complete=False,
    gemini_api_key=GEMINI_XCOM_API_KEY,
    create_embeddings_config={
        "display_name": "file-upload-embeddings-job",
    },
    input_source=UPLOADED_EMBEDDINGS_FILE_NAME,
)

Interacting with Gemini Files API

The Gemini Files API helps Gemini to handle various types of input data, including text, images, and audio, at the same time. Please note that Gemini Batch API mostly works with files that were uploaded via Gemini Files API. The Files API lets you store up to 20GB of files per project, with each file not exceeding 2GB in size. Files are stored for 48 hours.

Upload file

To upload file via Files API you can use GenAIGeminiUploadFileOperator. The operator returns the file name in XCom under file_name key.

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

upload_file = GenAIGeminiUploadFileOperator(
    task_id="upload_file_for_batch_job_task",
    project_id=PROJECT_ID,
    location=REGION,
    file_path=UPLOAD_FILE_PATH,
    gemini_api_key=GEMINI_XCOM_API_KEY,
)

Get file

To get file via Files API you can use GenAIGeminiGetFileOperator. The operator returns the file_name in XCom under file_name key.

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

get_file = GenAIGeminiGetFileOperator(
    task_id="get_file_for_batch_job_task",
    project_id=PROJECT_ID,
    location=REGION,
    file_name=UPLOADED_FILE_NAME,
    gemini_api_key=GEMINI_XCOM_API_KEY,
)

List files

To list files via Files API you can use GenAIGeminiListFilesOperator. The operator returns file names in XCom under file_names key.

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

list_files = GenAIGeminiListFilesOperator(
    task_id="list_files_files_api_task",
    project_id=PROJECT_ID,
    location=REGION,
    gemini_api_key=GEMINI_XCOM_API_KEY,
)

Delete file

To delete file via Files API you can use GenAIGeminiDeleteFileOperator.

tests/system/google/cloud/gen_ai/example_gen_ai_gemini_batch_api.py[source]

delete_file = GenAIGeminiDeleteFileOperator(
    task_id="delete_file_for_batch_job_task",
    project_id=PROJECT_ID,
    location=REGION,
    file_name=UPLOADED_FILE_NAME,
    gemini_api_key=GEMINI_XCOM_API_KEY,
    trigger_rule=TriggerRule.ALL_DONE,
)

Reference

For further information, look at:

Was this entry helpful?