Batch messaging

Batch messaging creates a publisher client with custom batching settings and uses it to publish some messages.

This document provides information about using batch messaging with messages published to a topic.

Before you begin

Before configuring the publish workflow, ensure you have completed the following tasks:

Required roles

To get the permissions that you need to publish messages to a topic, ask your administrator to grant you the Pub/Sub Publisher (roles/pubsub.publisher) IAM role on the topic. For more information about granting roles, see Manage access to projects, folders, and organizations.

You might also be able to get the required permissions through custom roles or other predefined roles.

You need additional permissions to create or update topics and subscriptions.

Use batch messaging

See the following code samples to learn how to configure batch messaging settings for your publisher.

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespacepubsub=::google::cloud::pubsub;
using::google::cloud::future;
using::google::cloud::Options;
using::google::cloud::StatusOr;
[](std::stringproject_id,std::stringtopic_id){
autotopic=pubsub::Topic(std::move(project_id),std::move(topic_id));
// By default, the publisher will flush a batch after 10ms, after it
// contains more than 100 message, or after it contains more than 1MiB of
// data, whichever comes first. This changes those defaults.
autopublisher=pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))
.set<pubsub::MaxBatchBytesOption>(4*1024*1024L)
.set<pubsub::MaxBatchMessagesOption>(200)));
std::vector<future<void>>ids;
for(charconst*data:{"1","2","3","go!"}){
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>>f){
autos=f.get();
if(!s)return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for(auto&id:ids)id.get();
}

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.


usingGoogle.Api.Gax ;
usingGoogle.Cloud.PubSub.V1 ;
usingSystem;
usingSystem.Collections.Generic;
usingSystem.Linq;
usingSystem.Threading;
usingSystem.Threading.Tasks;
publicclassPublishBatchedMessagesAsyncSample
{
publicasyncTask<int>PublishBatchMessagesAsync(stringprojectId,stringtopicId,IEnumerable<string>messageTexts)
{
TopicName topicName=TopicName .FromProjectTopic (projectId,topicId);
// Default Settings:
// byteCountThreshold: 1000000
// elementCountThreshold: 100
// delayThreshold: 10 milliseconds
varcustomSettings=newPublisherClient.Settings
{
BatchingSettings=newBatchingSettings (
elementCountThreshold:50,
byteCountThreshold:10240,
delayThreshold:TimeSpan.FromMilliseconds(500))
};
PublisherClient publisher=awaitnewPublisherClientBuilder
{
TopicName=topicName,
Settings=customSettings
}.BuildAsync();
intpublishedMessageCount=0;
varpublishTasks=messageTexts.Select(asynctext=>
{
try
{
stringmessage=awaitpublisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(refpublishedMessageCount);
}
catch(Exceptionexception)
{
Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
}
});
awaitTask.WhenAll(publishTasks);
// PublisherClient instance should be shutdown after use.
// The TimeSpan specifies for how long to attempt to publish locally queued messages.
awaitpublisher.ShutdownAsync (TimeSpan.FromSeconds(15));
returnpublishedMessageCount;
}
}

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2. To see a list of v1 code samples, see the deprecated code samples.

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.

import(
"context"
"fmt"
"io"
"strconv"
"time"
"cloud.google.com/go/pubsub/v2"
)
funcpublishWithSettings(wio.Writer,projectID,topicIDstring)error{
// projectID := "my-project-id"
// topicID := "my-topic"
ctx:=context.Background()
client,err:=pubsub.NewClient(ctx,projectID)
iferr!=nil{
returnfmt.Errorf("pubsub.NewClient: %w",err)
}
deferclient.Close()
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
publisher:=client.Publisher(topicID)
publisher.PublishSettings.ByteThreshold=5000
publisher.PublishSettings.CountThreshold=10
publisher.PublishSettings.DelayThreshold=100*time.Millisecond
varresults[]*pubsub.PublishResult
varresultErrors[]error
fori:=0;i < 10;i++{
result:=publisher.Publish(ctx,&pubsub.Message{
Data:[]byte("Message "+strconv.Itoa(i)),
})
results=append(results,result)
}
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
fori,res:=rangeresults{
id,err:=res.Get(ctx)
iferr!=nil{
resultErrors=append(resultErrors,err)
fmt.Fprintf(w,"Failed to publish: %v",err)
continue
}
fmt.Fprintf(w,"Published message %d; msg ID: %v\n",i,id)
}
iflen(resultErrors)!=0{
returnfmt.Errorf("Get: %v",resultErrors[len(resultErrors)-1])
}
fmt.Fprintf(w,"Published messages with batch settings.")
returnnil
}

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.


importcom.google.api.core.ApiFuture ;
importcom.google.api.core.ApiFutures ;
importcom.google.api.gax.batching.BatchingSettings ;
importcom.google.cloud.pubsub.v1.Publisher ;
importcom.google.protobuf.ByteString ;
importcom.google.pubsub.v1.PubsubMessage ;
importcom.google.pubsub.v1.TopicName ;
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.concurrent.ExecutionException;
importjava.util.concurrent.TimeUnit;
importorg.threeten.bp.Duration ;
publicclass PublishWithBatchSettingsExample{
publicstaticvoidmain(String...args)throwsException{
// TODO(developer): Replace these variables before running the sample.
StringprojectId="your-project-id";
StringtopicId="your-topic-id";
publishWithBatchSettingsExample(projectId,topicId);
}
publicstaticvoidpublishWithBatchSettingsExample(StringprojectId,StringtopicId)
throwsIOException,ExecutionException,InterruptedException{
TopicName topicName=TopicName .of(projectId,topicId);
Publisher publisher=null;
List<ApiFuture<String>>messageIdFutures=newArrayList<>();
try{
// Batch settings control how the publisher batches messages
longrequestBytesThreshold=5000L;// default : 1000 bytes
longmessageCountBatchSize=100L;// default : 100 message
Duration publishDelayThreshold=Duration .ofMillis(100);// default : 1 ms
// Publish request get triggered based on request size, messages count & time since last
// publish, whichever condition is met first.
BatchingSettings batchingSettings=
BatchingSettings .newBuilder()
.setElementCountThreshold (messageCountBatchSize)
.setRequestByteThreshold (requestBytesThreshold)
.setDelayThreshold (publishDelayThreshold)
.build();
// Create a publisher instance with default settings bound to the topic
publisher=Publisher .newBuilder(topicName).setBatchingSettings(batchingSettings).build();
// schedule publishing one message at a time : messages get automatically batched
for(inti=0;i < 100;i++){
Stringmessage="message "+i;
ByteString data=ByteString .copyFromUtf8 (message);
PubsubMessage pubsubMessage=PubsubMessage .newBuilder().setData (data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String>messageIdFuture=publisher.publish (pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
}finally{
// Wait on any pending publish requests.
List<String>messageIds=ApiFutures .allAsList (messageIdFutures).get();
System.out.println("Published "+messageIds.size()+" messages with batch settings.");
if(publisher!=null){
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown ();
publisher.awaitTermination (1,TimeUnit.MINUTES);
}
}
}
}

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;
// Imports the Google Cloud client library
const{PubSub}=require('@google-cloud/pubsub');
// Creates a client; cache this for further use
constpubSubClient=newPubSub ();
asyncfunctionpublishBatchedMessages(
topicNameOrId,
data,
maxMessages,
maxWaitTime,
){
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
constdataBuffer=Buffer.from (data);
// Cache topic objects (publishers) and reuse them.
constpublishOptions={
batching:{
maxMessages:maxMessages,
maxMilliseconds:maxWaitTime*1000,
},
};
constbatchPublisher=pubSubClient.topic(topicNameOrId,publishOptions);
constpromises=[];
for(leti=0;i < 10;i++){
promises.push(
(async()=>{
constmessageId=awaitbatchPublisher.publishMessage({
data:dataBuffer,
});
console.log(`Message ${messageId} published.`);
})(),
);
}
awaitPromise .all(promises);
}

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;
// Imports the Google Cloud client library
import{PublishOptions,PubSub}from'@google-cloud/pubsub';
// Creates a client; cache this for further use
constpubSubClient=newPubSub();
asyncfunctionpublishBatchedMessages(
topicNameOrId:string,
data:string,
maxMessages:number,
maxWaitTime:number,
){
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
constdataBuffer=Buffer.from(data);
// Cache topic objects (publishers) and reuse them.
constpublishOptions:PublishOptions={
batching:{
maxMessages:maxMessages,
maxMilliseconds:maxWaitTime*1000,
},
};
constbatchPublisher=pubSubClient.topic(topicNameOrId,publishOptions);
constpromises:Promise<void>[]=[];
for(leti=0;i < 10;i++){
promises.push(
(async()=>{
constmessageId=awaitbatchPublisher.publishMessage({
data:dataBuffer,
});
console.log(`Message ${messageId} published.`);
})(),
);
}
awaitPromise.all(promises);
}

PHP

Before trying this sample, follow the PHP setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub PHP API reference documentation.

use Google\Cloud\PubSub\PubSubClient;
/**
 * Publishes a message for a Pub/Sub topic.
 *
 * The publisher should be used in conjunction with the `google-cloud-batch`
 * daemon, which should be running in the background.
 *
 * To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
 *
 * @param string $projectId The Google project ID.
 * @param string $topicName The Pub/Sub topic name.
 * @param string $message The message to publish.
 */
function publish_message_batch($projectId, $topicName, $message)
{
 // Check if the batch daemon is running.
 if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
 trigger_error(
 'The batch daemon is not running. Call ' .
 '`vendor/bin/google-cloud-batch daemon` from ' .
 'your project root to start the daemon.',
 E_USER_NOTICE
 );
 }
 $batchOptions = [
 'batchSize' => 100, // Max messages for each batch.
 'callPeriod' => 0.01, // Max time in seconds between each batch publish.
 ];
 $pubsub = new PubSubClient([
 'projectId' => $projectId,
 ]);
 $topic = $pubsub->topic($topicName);
 $publisher = $topic->batchPublisher([
 'batchOptions' => $batchOptions
 ]);
 for ($i = 0; $i < 10; $i++) {
 $publisher->publish(['data' => $message]);
 }
 print('Messages enqueued for publication.' . PHP_EOL);
}

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.

fromconcurrentimport futures
fromgoogle.cloudimport pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings (
 max_messages=10, # default 100
 max_bytes=1024, # default 1 MB
 max_latency=1, # default 10 ms
)
publisher = pubsub_v1.PublisherClient (batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
# Resolve the publish future in a separate thread.
defcallback(future: pubsub_v1.publisher.futures.Future) -> None:
 message_id = future.result ()
 print(message_id)
for n in range(1, 10):
 data_str = f"Message number {n}"
 # Data must be a bytestring
 data = data_str.encode("utf-8")
 publish_future = publisher.publish (topic_path, data)
 # Non-blocking. Allow the publisher client to batch multiple messages.
 publish_future.add_done_callback(callback)
 publish_futures.append(publish_future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with batch settings to {topic_path}.")

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3. To see a list of Ruby v2 code samples, see the deprecated code samples.

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.

# topic_id = "your-topic-id"
pubsub=Google::Cloud::PubSub .new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
publisher=pubsub.publisher topic_id,async:{
max_bytes:1_000_000,
max_messages:20
}
10.timesdo|i|
publisher.publish_async "This is message ##{i}."
end
# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher .stop.wait!
puts"Messages published asynchronously in batch."

Disable batch messaging

To turn off batching in your client library, set the value of max_messages to 1.

Batch messaging and ordered delivery

With ordered delivery, failing to acknowledge any message in the batch means that all the messages in the batch, including the ones sent before the message that was not acknowledged, are all redelivered.

Quotas and limits on batch messaging

Before configuring batch messaging, consider the effect of factors such as publish throughput quota and the maximum size of a batch. The high-level client libraries ensure that batch requests are kept within the specified limits.

  • 1000 bytes is the minimum request size considered for cost purposes, even if the actual message size might be smaller than 1000 bytes.
  • Pub/Sub has a limit of 10-MB size or 1000 number of messages for a single batch publish request.

For more information, see Pub/Sub quotas and limits.

What's next

To learn how to configure advanced publishing options, see the following:

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025年10月13日 UTC.