Build a BigQuery processing pipeline for Knative serving with Eventarc

This tutorial shows you how to use Eventarc to build a processing pipeline that schedules queries to a public BigQuery dataset, generates charts based on the data, and shares links to the charts through email.

Objectives

In this tutorial, you will build and deploy three Knative serving services running in a Google Kubernetes Engine (GKE) cluster and that receive events using Eventarc:

  1. Query runner—Triggered when Cloud Scheduler jobs publish a message to a Pub/Sub topic; this service uses the BigQuery API to retrieve data from a public COVID-19 dataset, and saves the results in a new BigQuery table.
  2. Chart creator—Triggered when the query runner service publishes a message to a Pub/Sub topic; this service generates charts using the Python plotting library, Matplotlib, and saves the charts to a Cloud Storage bucket.
  3. Notifier—Triggered by audit logs when the chart creator service stores a chart in a Cloud Storage bucket; this service uses the email service, SendGrid, to send links of the charts to an email address.

The following diagram shows the high-level architecture:

BigQuery processing pipeline

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator.

New Google Cloud users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get 300ドル in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloudinit
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloudservicesenableartifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloudinit
  11. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Verify that billing is enabled for your Google Cloud project.

  13. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloudservicesenableartifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  14. For Cloud Storage, enable audit logging for the ADMIN_READ, DATA_WRITE, and DATA_READ data access types.

    1. Read the Identity and Access Management (IAM) policy associated with your Google Cloud project, folder, or organization and store it in a temporary file:
      gcloud projects get-iam-policy PROJECT_ID > /tmp/policy.yaml
    2. In a text editor, open /tmp/policy.yaml, and add or change only the audit log configuration in the auditConfigs section:

      
      auditConfigs:
      -auditLogConfigs:
      -logType:ADMIN_READ
      -logType:DATA_WRITE
      -logType:DATA_READ
      service:storage.googleapis.com
      bindings:
      -members:
      [...]
      etag:BwW_bHKTV5U=
      version:1
    3. Write your new IAM policy:

      gcloud projects set-iam-policy PROJECT_ID /tmp/policy.yaml

      If the preceding command reports a conflict with another change, then repeat these steps, starting with reading the IAM policy. For more information, see Configure Data Access audit logs with the API.

  15. Set the defaults used in this tutorial:
    CLUSTER_NAME=events-cluster
    CLUSTER_LOCATION=us-central1
    PROJECT_ID=PROJECT_ID
    gcloudconfigsetproject$PROJECT_ID
    gcloudconfigsetrun/region$CLUSTER_LOCATION
    gcloudconfigsetrun/cluster$CLUSTER_NAME
    gcloudconfigsetrun/cluster_location$CLUSTER_LOCATION
    gcloudconfigsetrun/platformgke
    gcloudconfigseteventarc/location$CLUSTER_LOCATION

    Replace PROJECT_ID with your project ID.

Create a SendGrid API key

SendGrid is a cloud-based email provider that lets you send email without having to maintain email servers.

  1. Sign in to SendGrid and go to Settings> API Keys.
  2. Click Create API Key.
  3. Select the permissions for the key. At a minimum, the key must have Mail Send permissions to send email.
  4. Click Save to create the key.
  5. SendGrid generates a new key. This is the only copy of the key, so make sure that you copy the key and save it for later.

Create a GKE cluster

Create a cluster with Workload Identity Federation for GKE enabled so that it can access Google Cloud services from applications running within GKE. You also need Workload Identity Federation for GKE to forward events using Eventarc.

  1. Create a GKE cluster for Knative serving with the CloudRun, HttpLoadBalancing and HorizontalPodAutoscaling addons enabled:

    gcloud beta container clusters create $CLUSTER_NAME \
     --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
     --machine-type=n1-standard-4 \
     --enable-autoscaling --min-nodes=2 --max-nodes=10 \
     --no-issue-client-certificate --num-nodes=2 \
     --logging=SYSTEM,WORKLOAD \
     --monitoring=SYSTEM \
     --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
     --zone us-central1 \
     --release-channel=rapid \
     --workload-pool=$PROJECT_ID.svc.id.goog
    
  2. Wait a few minutes for the cluster creation to complete. During the process, you might see warnings that you can safely ignore. When the cluster has been created, the output is similar to the following:

    Creating cluster ...done.
    Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
    
  3. Create an Artifact Registry standard repository to store your Docker container image:

    gcloudartifactsrepositoriescreateREPOSITORY\
    --repository-format=docker\
    --location=$CLUSTER_LOCATION

    Replace REPOSITORY with a unique name for the repository.

Configure the GKE service account

Configure a GKE service account to act as the default compute service account.

  1. Create an Identity and Access Management (IAM) binding between the service accounts:

    PROJECT_NUMBER="$(gcloudprojectsdescribe$(gcloudconfigget-valueproject)--format='value(projectNumber)')"
    gcloudiamservice-accountsadd-iam-policy-binding\
    --roleroles/iam.workloadIdentityUser\
    --member"serviceAccount:$PROJECT_ID.svc.id.goog[default/default]"\
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com
  2. Add the iam.gke.io/gcp-service-account annotation to the GKE service account, using the email address of the compute service account:

    kubectlannotateserviceaccount\
    --namespacedefault\
    default\
    iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com

Enable GKE destinations

To allow Eventarc to manage resources in the GKE cluster, enable GKE destinations and bind the Eventarc service account with the required roles.

  1. Enable GKE destinations for Eventarc:

    gcloudeventarcgke-destinationsinit
  2. At the prompt to bind the required roles, enter y.

    The following roles are bound:

    • roles/compute.viewer
    • roles/container.developer
    • roles/iam.serviceAccountAdmin

Create a service account and bind access roles

Before creating the Eventarc trigger, set up a user-managed service account and grant it specific roles so that Eventarc can forward Pub/Sub events.

  1. Create a service account called TRIGGER_GSA:

    TRIGGER_GSA=eventarc-bigquery-triggers
    gcloudiamservice-accountscreate$TRIGGER_GSA
  2. Grant the pubsub.subscriber, monitoring.metricWriter, and eventarc.eventReceiver roles to the service account:

    PROJECT_ID=$(gcloudconfigget-valueproject)
    gcloudprojectsadd-iam-policy-binding$PROJECT_ID\
    --member"serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com"\
    --role"roles/pubsub.subscriber"
    gcloudprojectsadd-iam-policy-binding$PROJECT_ID\
    --member"serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com"\
    --role"roles/monitoring.metricWriter"
    gcloudprojectsadd-iam-policy-binding$PROJECT_ID\
    --member"serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com"\
    --role"roles/eventarc.eventReceiver"

Create a Cloud Storage bucket

Create a Cloud Storage bucket to save the charts. Make sure that the bucket and the charts are publicly available, and in the same region as your GKE service:

exportBUCKET="$(gcloudconfigget-valuecore/project)-charts"
gcloudstoragebucketscreategs://${BUCKET}--location=$(gcloudconfigget-valuerun/region)
gcloudstoragebucketsupdategs://${BUCKET}--uniform-bucket-level-access
gcloudstoragebucketsadd-iam-policy-bindinggs://${BUCKET}--member=allUsers--role=roles/storage.objectViewer

Clone the repository

Clone the GitHub repository.

gitclonehttps://github.com/GoogleCloudPlatform/eventarc-samples
cdeventarc-samples/processing-pipelines

Deploy the notifier service

From the bigquery/notifier/python directory, deploy a Knative serving service that receives chart creator events and uses SendGrid to email links to the generated charts.

  1. Build and push the container image:

    pushdbigquery/notifier/python
    exportSERVICE_NAME=notifier
    dockerbuild-t$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1.
    dockerpush$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. Deploy the container image to Knative serving, passing in an address to send emails to, and the SendGrid API key:

    exportTO_EMAILS=EMAIL_ADDRESS
    exportSENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloudrundeploy${SERVICE_NAME}\
    --image$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1\
    --update-env-varsTO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}

    Replace the following:

    • EMAIL_ADDRESS: an email address to send the links to the generated charts
    • YOUR_SENDGRID_API_KEY: the SendGrid API key you noted previously

When you see the service URL, the deployment is complete.

Create a trigger for the notifier service

The Eventarc trigger for the notifier service deployed on Knative serving filters for Cloud Storage audit logs where the methodName is storage.objects.create.

  1. Create the trigger:

    gcloudeventarctriggerscreatetrigger-${SERVICE_NAME}-gke\
    --destination-gke-cluster=$CLUSTER_NAME\
    --destination-gke-location=$CLUSTER_LOCATION\
    --destination-gke-namespace=default\
    --destination-gke-service=$SERVICE_NAME\
    --destination-gke-path=/\
    --event-filters="type=google.cloud.audit.log.v1.written"\
    --event-filters="serviceName=storage.googleapis.com"\
    --event-filters="methodName=storage.objects.create"\
    --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    This creates a trigger called trigger-notifier-gke.

Deploy the chart creator service

From the bigquery/chart-creator/python directory, deploy a Knative serving service that receives query runner events, retrieves data from a BigQuery table for a specific country, and then generates a chart, using Matplotlib, from the data. The chart is uploaded to a Cloud Storage bucket.

  1. Build and push the container image:

    pushdbigquery/chart-creator/python
    exportSERVICE_NAME=chart-creator
    dockerbuild-t$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1.
    dockerpush$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. Deploy the container image to Knative serving, passing in BUCKET:

    gcloudrundeploy${SERVICE_NAME}\
    --image$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1\
    --update-env-varsBUCKET=${BUCKET}

When you see the service URL, the deployment is complete.

Create a trigger for the chart creator service

The Eventarc trigger for the chart creator service deployed on Knative serving filters for messages published to a Pub/Sub topic.

  1. Create the trigger:

    gcloudeventarctriggerscreatetrigger-${SERVICE_NAME}-gke\
    --destination-gke-cluster=$CLUSTER_NAME\
    --destination-gke-location=$CLUSTER_LOCATION\
    --destination-gke-namespace=default\
    --destination-gke-service=$SERVICE_NAME\
    --destination-gke-path=/\
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"\
    --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    This creates a trigger called trigger-chart-creator-gke.

  2. Set the Pub/Sub topic environment variable.

    exportTOPIC_QUERY_COMPLETED=$(basename$(gcloudeventarctriggersdescribetrigger-${SERVICE_NAME}-gke--format='value(transport.pubsub.topic)'))

Deploy the query runner service

From the processing-pipelines directory, deploy a Knative serving service that receives Cloud Scheduler events, retrieves data from a public COVID-19 dataset, and saves the results in a new BigQuery table.

  1. Build and push the container image:

    exportSERVICE_NAME=query-runner
    dockerbuild-t$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1-fDockerfile.
    dockerpush$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1
  2. Deploy the container image to Knative serving, passing in PROJECT_ID and TOPIC_QUERY_COMPLETED:

    gcloudrundeploy${SERVICE_NAME}\
    --image$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/${SERVICE_NAME}:v1\
    --update-env-varsPROJECT_ID=$(gcloudconfigget-valueproject),TOPIC_ID=${TOPIC_QUERY_COMPLETED}

When you see the service URL, the deployment is complete.

Create a trigger for the query runner service

The Eventarc trigger for the query runner service deployed on Knative serving filters for messages published to a Pub/Sub topic.

  1. Create the trigger:

    gcloudeventarctriggerscreatetrigger-${SERVICE_NAME}-gke\
    --destination-gke-cluster=$CLUSTER_NAME\
    --destination-gke-location=$CLUSTER_LOCATION\
    --destination-gke-namespace=default\
    --destination-gke-service=$SERVICE_NAME\
    --destination-gke-path=/\
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"\
    --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    This creates a trigger called trigger-query-runner-gke.

  2. Set an environment variable for the Pub/Sub topic.

    exportTOPIC_QUERY_SCHEDULED=$(gcloudeventarctriggersdescribetrigger-${SERVICE_NAME}-gke--format='value(transport.pubsub.topic)')

Schedule the jobs

The processing pipeline is triggered by two Cloud Scheduler jobs.

  1. Create an App Engine app which is required by Cloud Scheduler and specify an appropriate location (for example, europe-west):

    exportAPP_ENGINE_LOCATION=LOCATION
    gcloudappcreate--region=${APP_ENGINE_LOCATION}
  2. Create two Cloud Scheduler jobs that publish to a Pub/Sub topic once per day:

    gcloudschedulerjobscreatepubsubcre-scheduler-uk\
    --schedule="0 16 * * *"\
    --topic=${TOPIC_QUERY_SCHEDULED}\
    --message-body="United Kingdom"
    gcloudschedulerjobscreatepubsubcre-scheduler-cy\
    --schedule="0 17 * * *"\
    --topic=${TOPIC_QUERY_SCHEDULED}\
    --message-body="Cyprus"

    The schedule is specified in unix-cron format. For example, 0 16 * * * means that the jobs runs at 16:00 (4 PM) UTC every day.

Run the pipeline

  1. Confirm that all the triggers were successfully created:

    gcloudeventarctriggerslist

    The output should be similar to the following:

    NAME TYPE DESTINATION ACTIVE LOCATION
    trigger-chart-creator-gke google.cloud.pubsub.topic.v1.messagePublished GKE:chart-creator Yes us-central1
    trigger-notifier-gke google.cloud.audit.log.v1.written GKE:notifier Yes us-central1
    trigger-query-runner-gke google.cloud.pubsub.topic.v1.messagePublished GKE:query-runner Yes us-central1
    
  2. Retrieve the Cloud Scheduler job IDs:

    gcloudschedulerjobslist

    The output should be similar to the following:

    ID LOCATION SCHEDULE (TZ) TARGET_TYPE STATE
    cre-scheduler-cy us-central1 0 17 * * * (Etc/UTC) Pub/Sub ENABLED
    cre-scheduler-uk us-central1 0 16 * * * (Etc/UTC) Pub/Sub ENABLED
    
  3. Although the jobs are scheduled to run daily at 4 and 5 PM, you can also run the Cloud Scheduler jobs manually:

    gcloudschedulerjobsruncre-scheduler-cy
    gcloudschedulerjobsruncre-scheduler-uk
  4. After a few minutes, confirm that there are two charts in the Cloud Storage bucket:

    gcloudstoragelsgs://${BUCKET}

    The output should be similar to the following:

    gs://PROJECT_ID-charts/chart-cyprus.png
    gs://PROJECT_ID-charts/chart-unitedkingdom.png
    

Congratulations! You should also receive two emails with links to the charts.

Clean up

If you created a new project for this tutorial, delete the project. If you used an existing project and want to keep it without the changes added in this tutorial, delete the resources created for the tutorial.

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Delete tutorial resources

  1. Delete any Knative serving services you deployed in this tutorial:

    gcloudrunservicesdeleteSERVICE_NAME

    Where SERVICE_NAME is your chosen service name.

    You can also delete Knative serving services from the Google Cloud console.

  2. Delete any Eventarc triggers you created in this tutorial:

    gcloud eventarc triggers delete TRIGGER_NAME
    

    Replace TRIGGER_NAME with the name of your trigger.

  3. Remove any Google Cloud CLI default configurations you added during the tutorial setup.

    gcloudconfigunsetproject
    gcloudconfigunsetrun/cluster
    gcloudconfigunsetrun/cluster_location
    gcloudconfigunsetrun/platform
    gcloudconfigunseteventarc/location
    gcloudconfigunsetcompute/zone
  4. Delete the images from Artifact Registry.

    gcloudartifactsdockerimagesdelete$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/notifier:v1
    gcloudartifactsdockerimagesdelete$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/chart-creator:v1
    gcloudartifactsdockerimagesdelete$CLUSTER_LOCATION-docker.pkg.dev/$(gcloudconfigget-valueproject)/REPOSITORY/query-runner:v1
  5. Delete the bucket, along with all the objects within the bucket:

    gcloud storage rm --recursive gs://${BUCKET}/
  6. Delete the Cloud Scheduler jobs:

    gcloudschedulerjobsdeletecre-scheduler-cy
    gcloudschedulerjobsdeletecre-scheduler-uk

What's next

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025年11月06日 UTC.