Remote inference in Apache Beam

Run in Google Colab View source on GitHub

This example demonstrates how to implement a custom inference call in Apache Beam by using the Google Cloud Vision API.

The prefered way to run inference in Apache Beam is by using the RunInference API. The RunInference API enables you to run models as part of your pipeline in a way that is optimized for machine learning inference. To reduce the number of steps in your pipeline, RunInference supports features like batching. For more infomation about the RunInference API, review the RunInference API.

This notebook creates a custom model handler to make remote inference calls by using the Cloud Vision API. To make remote inference calls to Vertex AI, use the Vertex AI model handler JSON.

Run the Cloud Vision API

You can use the Cloud Vision API to retrieve labels that describe an image. For example, the following image shows a cat with possible labels.

cat-with-labels.png

To run the Google Cloud Vision API on a large set of images, Apache Beam is the ideal tool to handle the workflow. This example demonstates how to retrieve image labels with this API on a small set of images.

The example follows these steps:

  • Read the images.
  • Send the images to an external API to run inference by using the RunInference PTransform.
  • Postprocess the results of your API.

To optimize the calls to the external API, limit the parallel calls to the external remote API by configuring pipeline options. In Apache Beam, each runner provides options to handle the parallelism. The following list includes two examples:

For information about other runners, see the Beam capability matrix

Before you begin

Download and install the dependencies.

!pip install --upgrade pip
!pip install protobuf==3.19.4
!pip install apache-beam[interactive,gcp]>=2.65.0
!pip install google-cloud-vision==3.1.1
!pip install requests
# To use the newly installed version, restart the runtime.
exit()

To use the Cloud Vision API, authenticate with Google Cloud.

# Follow the steps to configure your Google Cloup setup.
gcloudinit
gcloudauthapplication-defaultlogin

Run remote inference on Cloud Vision API

This section shows how to run remote inference on the Cloud Vision API.

Download and install Apache Beam and the required modules.

importio
importrequests
fromgoogle.cloudimport vision
fromgoogle.cloud.vision_v1.typesimport Feature
importapache_beamasbeam
fromapache_beam.ml.inference.baseimport RemoteModelHandler
fromapache_beam.ml.inference.baseimport RunInference

This example uses images from the MSCoco dataset as a list of image URLs. This data is used as the pipeline input.

image_urls = [
 "http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg",
 "http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
 "http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg",
 "http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg",
 "http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg",
 "http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
 "http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
]
defread_image(image_url):
"""Read image from url and return image_url, image bytes"""
 response = requests.get(image_url)
 image_bytes = io.BytesIO(response.content).read()
 return image_url, image_bytes

Create a custom model handler

In order to implement remote inference, create a custom model handler. Use the run_inference method to implement the model call and to return its results.

When you run remote inference, prepare to encounter, identify, and handle failure as gracefully as possible. We recommend using the following techniques:

  • Exponential backoff: Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession. The RemoteModelHandler base class handles this logic, with the retry_fn argument determining which errors are retryable. For this example we will always retry.

  • Dead-letter queues: Route failed inferences to a separate PCollection without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior). This is provided through the with_exception_handling() option for RunInference. This produces tagged outputs for the failed inferences which can be handled separately from successful ones. You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed.

def_always_retry(exception: Exception) -> bool:
 return True
classCloudVisionModelHandler(RemoteModelHandler):
 def__init__(self):
"""DoFn that accepts a batch of images as bytearray
 and sends that batch to the Cloud Vision API for remote inference
 """
 super().__init__(namespace="CloudVisionModelHandler", retry_filter=_always_retry)
 defcreate_client(self):
"""Initiate the Google Vision API client."""
 client = vision.ImageAnnotatorClient()
 return client
 defrequest(self, batch, model, inference_args):
 feature = Feature()
 feature.type_ = Feature.Type.LABEL_DETECTION
 # The list of image_urls
 image_urls = [image_url for (image_url, image_bytes) in batch]
 # Create a batch request for all images in the batch.
 images = [vision.Image(content=image_bytes) for (image_url, image_bytes) in batch]
 image_requests = [vision.AnnotateImageRequest(image=image, features=[feature]) for image in images]
 batch_image_request = vision.BatchAnnotateImagesRequest(requests=image_requests)
 # Send the batch request to the remote endpoint.
 responses = model.batch_annotate_images(request=batch_image_request).responses
 return list(zip(image_urls, responses))

Manage batching

When you run inference with your model, either in Apache Beam or in an external API, batch your input to increase the efficiency of the model execution. The RunInference PTransform automatically manages batching by using the BatchElements transform to dynamically group elements together into batches based on the throughput of the pipeline.

If you are designing your own API endpoint, make sure that it can handle batches.

Create the pipeline

This section demonstrates how to chain the pipeline steps together to complete the following tasks:

  • Read data.

  • Transform the data to fit the model input.

  • Run inference with a custom Cloud Vision model handler.

  • Process and display the results.

with beam.Pipeline() as pipeline:
 main, failed = (pipeline | "Create inputs" >> beam.Create(image_urls)
 | "Read images" >> beam.Map(read_image)
 | "Inference" >> RunInference(model_handler=CloudVisionModelHandler()).with_exception_handling()
 )
 _ = main | "Print image_url and annotation" >> beam.Map(print)
 _ = failed.failed_inferences | "Print failed inferences" >> beam.Map(print)
('http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg', label_annotations {
 mid: "/m/04_sv"
 description: "Motorcycle"
 score: 0.9922548
 topicality: 0.14033242
}
label_annotations {
 mid: "/m/01prls"
 description: "Land vehicle"
 score: 0.99086833
 topicality: 0.0029524593
}
label_annotations {
 mid: "/m/0768fx"
 description: "Automotive lighting"
 score: 0.9853215
 topicality: 0.002913047
}
label_annotations {
 mid: "/m/07yv9"
 description: "Vehicle"
 score: 0.98517245
 topicality: 0.010408105
}
label_annotations {
 mid: "/m/043g5f"
 description: "Fuel tank"
 score: 0.9823826
 topicality: 0.01933147
}
label_annotations {
 mid: "/m/012f08"
 description: "Motor vehicle"
 score: 0.97732854
 topicality: 0.0009314301
}
label_annotations {
 mid: "/m/0h9mv"
 description: "Tire"
 score: 0.9735299
 topicality: 0.0020883244
}
label_annotations {
 mid: "/m/083wq"
 description: "Wheel"
 score: 0.9715105
 topicality: 0.0028435893
}
label_annotations {
 mid: "/m/0h8pb3l"
 description: "Automotive Tire"
 score: 0.96993804
 topicality: 5.827098e-05
}
label_annotations {
 mid: "/m/0h8ls87"
 description: "Automotive Exterior"
 score: 0.9641536
 topicality: 0.00045098987
}
)
('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {
 mid: "/m/02w3_ws"
 description: "Personal care"
 score: 0.853392
 topicality: 0.00013828959
}
label_annotations {
 mid: "/m/02pkr5"
 description: "Plumbing fixture"
 score: 0.8383083
 topicality: 0.012253191
}
label_annotations {
 mid: "/m/0b_zf"
 description: "Plumbing"
 score: 0.726803
 topicality: 0.016276756
}
label_annotations {
 mid: "/m/01j2bj"
 description: "Bathroom"
 score: 0.72486097
 topicality: 0.35419264
}
label_annotations {
 mid: "/m/02jz0l"
 description: "Tap"
 score: 0.6317307
 topicality: 0.00705197
}
label_annotations {
 mid: "/m/0130jx"
 description: "Sink"
 score: 0.5732167
 topicality: 0.07520393
}
label_annotations {
 mid: "/m/054_l"
 description: "Mirror"
 score: 0.5680867
 topicality: 0.08497098
}
label_annotations {
 mid: "/m/0h8lr5r"
 description: "Bathroom Sink"
 score: 0.557554
 topicality: 0.007725588
}
label_annotations {
 mid: "/m/03jvk"
 description: "Household hardware"
 score: 0.5140049
 topicality: 0.00064662547
}
)
('http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg', error {
 code: 3
 message: "Bad image data."
}
)
('http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg', error {
 code: 3
 message: "Bad image data."
}
)
('http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg', error {
 code: 3
 message: "Bad image data."
}
)
('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {
 mid: "/m/02w3_ws"
 description: "Personal care"
 score: 0.853392
 topicality: 0.00013828959
}
label_annotations {
 mid: "/m/02pkr5"
 description: "Plumbing fixture"
 score: 0.8383083
 topicality: 0.012253191
}
label_annotations {
 mid: "/m/0b_zf"
 description: "Plumbing"
 score: 0.726803
 topicality: 0.016276756
}
label_annotations {
 mid: "/m/01j2bj"
 description: "Bathroom"
 score: 0.72486097
 topicality: 0.35419264
}
label_annotations {
 mid: "/m/02jz0l"
 description: "Tap"
 score: 0.6317307
 topicality: 0.00705197
}
label_annotations {
 mid: "/m/0130jx"
 description: "Sink"
 score: 0.5732167
 topicality: 0.07520393
}
label_annotations {
 mid: "/m/054_l"
 description: "Mirror"
 score: 0.5680867
 topicality: 0.08497098
}
label_annotations {
 mid: "/m/0h8lr5r"
 description: "Bathroom Sink"
 score: 0.557554
 topicality: 0.007725588
}
label_annotations {
 mid: "/m/03jvk"
 description: "Household hardware"
 score: 0.5140049
 topicality: 0.00064662547
}
)
('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {
 mid: "/m/02w3_ws"
 description: "Personal care"
 score: 0.853392
 topicality: 0.00013828959
}
label_annotations {
 mid: "/m/02pkr5"
 description: "Plumbing fixture"
 score: 0.8383083
 topicality: 0.012253191
}
label_annotations {
 mid: "/m/0b_zf"
 description: "Plumbing"
 score: 0.726803
 topicality: 0.016276756
}
label_annotations {
 mid: "/m/01j2bj"
 description: "Bathroom"
 score: 0.72486097
 topicality: 0.35419264
}
label_annotations {
 mid: "/m/02jz0l"
 description: "Tap"
 score: 0.6317307
 topicality: 0.00705197
}
label_annotations {
 mid: "/m/0130jx"
 description: "Sink"
 score: 0.5732167
 topicality: 0.07520393
}
label_annotations {
 mid: "/m/054_l"
 description: "Mirror"
 score: 0.5680867
 topicality: 0.08497098
}
label_annotations {
 mid: "/m/0h8lr5r"
 description: "Bathroom Sink"
 score: 0.557554
 topicality: 0.007725588
}
label_annotations {
 mid: "/m/03jvk"
 description: "Household hardware"
 score: 0.5140049
 topicality: 0.00064662547
}
)

Monitor the pipeline

Because monitoring can provide insight into the status and health of the application, consider monitoring and measuring pipeline performance. For information about the available tracking metrics, see RunInference Metrics.

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025年10月22日 UTC.