Retry requests

Publishing failures are typically caused by client-side bottlenecks, such as insufficient service CPUs, bad thread health, or network congestion. The publisher retry policy defines the number of times Pub/Sub tries to deliver a message and the length of time between each attempt.

This document provides information about using retry requests with messages published to a topic.

Before you begin

Before configuring the publish workflow, ensure you have completed the following tasks:

Required roles

To get the permissions that you need to retry message requests to a topic, ask your administrator to grant you the Pub/Sub Publisher (roles/pubsub.publisher) IAM role on the topic. For more information about granting roles, see Manage access to projects, folders, and organizations.

You might also be able to get the required permissions through custom roles or other predefined roles.

You need additional permissions to create or update topics and subscriptions.

About retry requests

Retry settings control how the Pub/Sub client libraries retry publish requests. The client libraries have any of the following retry settings:

  • Initial request timeout: the amount of time before a client library stops waiting for the initial publish request to complete.
  • Retry delay: the amount of time after a request times out that a client library waits to retry the request.
  • Total timeout: the amount of time after a client library stops retrying publish requests.

To retry publish requests, the initial request timeout must be shorter than the total timeout. For example, if you're using exponential backoff, the client libraries compute the request timeout and retry delay as follows:

  • After each publish request, the request timeout increases by the request timeout multiplier, up to the maximum request timeout.
  • After each retry, the retry delay increases by the retry delay multiplier, up to the maximum retry delay.

Retry a message request

During the publishing process, you might see transient or permanent publishing failures. For transient errors, you typically don't need to take any special action as Pub/Sub automatically retries the messages.

An error can also occur when a publish operation succeeds but the publish response is not received in time by the publisher client. In this case too, the publish operation is retried. As a result, you can have two identical messages with different message IDs.

In case of persistent errors, consider implementing appropriate actions outside of the publishing process to avoid overwhelming Pub/Sub.

Publishing failures are automatically retried, except for errors that don't warrant retries. This sample code demonstrates creating a publisher with custom retry settings (note that not all client libraries support custom retry settings; see the API Reference documentation for your chosen language):

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespacepubsub=::google::cloud::pubsub;
using::google::cloud::future;
using::google::cloud::Options;
using::google::cloud::StatusOr;
[](std::stringproject_id,std::stringtopic_id){
autotopic=pubsub::Topic(std::move(project_id),std::move(topic_id));
// By default a publisher will retry for 60 seconds, with an initial backoff
// of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
// 30% after each attempt. This changes those defaults.
autopublisher=pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::RetryPolicyOption>(
pubsub::LimitedTimeRetryPolicy(
/*maximum_duration=*/std::chrono::minutes(10))
.clone())
.set<pubsub::BackoffPolicyOption>(
pubsub::ExponentialBackoffPolicy(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(45),
/*scaling=*/2.0)
.clone())));
std::vector<future<bool>>done;
for(charconst*data:{"1","2","3","go!"}){
done.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([](future<StatusOr<std::string>>f){
returnf.get().ok();
}));
}
publisher.Flush();
intcount=0;
for(auto&f:done){
if(f.get())++count;
}
std::cout << count << " messages sent successfully\n";
}

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.


usingGoogle.Api.Gax.Grpc ;
usingGoogle.Cloud.PubSub.V1 ;
usingGrpc.Core ;
usingSystem;
usingSystem.Threading.Tasks;
publicclassPublishMessageWithRetrySettingsAsyncSample
{
publicasyncTaskPublishMessageWithRetrySettingsAsync(stringprojectId,stringtopicId,stringmessageText)
{
TopicName topicName=TopicName .FromProjectTopic (projectId,topicId);
// Retry settings control how the publisher handles retry-able failures
varmaxAttempts=3;
varinitialBackoff=TimeSpan.FromMilliseconds(110);// default: 100 ms
varmaxBackoff=TimeSpan.FromSeconds(70);// default : 60 seconds
varbackoffMultiplier=1.3;// default: 1.0
vartotalTimeout=TimeSpan.FromSeconds(100);// default: 600 seconds
varpublisher=awaitnewPublisherClientBuilder
{
TopicName=topicName,
ApiSettings=newPublisherServiceApiSettings
{
PublishSettings=CallSettings .FromRetry (RetrySettings .FromExponentialBackoff (
maxAttempts:maxAttempts,
initialBackoff:initialBackoff,
maxBackoff:maxBackoff,
backoffMultiplier:backoffMultiplier,
retryFilter:RetrySettings .FilterForStatusCodes (StatusCode .Unavailable )))
.WithTimeout (totalTimeout)
}
}.BuildAsync();
stringmessage=awaitpublisher.PublishAsync(messageText);
Console.WriteLine($"Published message {message}");
// PublisherClient instance should be shutdown after use.
// The TimeSpan specifies for how long to attempt to publish locally queued messages.
awaitpublisher.ShutdownAsync(TimeSpan.FromSeconds(15));
}
}

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2. To see a list of v1 code samples, see the deprecated code samples.

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.

import(
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub/v2"
vkit"cloud.google.com/go/pubsub/v2/apiv1"
gax"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
)
funcpublishWithRetrySettings(wio.Writer,projectID,topicID,msgstring)error{
// projectID := "my-project-id"
// topicID := "my-topic"
// msg := "Hello World"
ctx:=context.Background()
config:=&pubsub.ClientConfig{
TopicAdminCallOptions:&vkit.TopicAdminCallOptions{
Publish:[]gax.CallOption{
gax.WithRetry(func()gax.Retryer{
returngax.OnCodes([]codes.Code{
codes.Aborted,
codes.Canceled,
codes.Internal,
codes.ResourceExhausted,
codes.Unknown,
codes.Unavailable,
codes.DeadlineExceeded,
},gax.Backoff{
Initial:250*time.Millisecond,// default 100 milliseconds
Max:60*time.Second,// default 60 seconds
Multiplier:1.45,// default 1.3
})
}),
},
},
}
client,err:=pubsub.NewClientWithConfig(ctx,projectID,config)
iferr!=nil{
returnfmt.Errorf("pubsub: NewClient: %w",err)
}
deferclient.Close()
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
// Reuse this publisher for all publish calls to send messages in batches.
publisher:=client.Publisher(topicID)
result:=publisher.Publish(ctx,&pubsub.Message{
Data:[]byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id,err:=result.Get(ctx)
iferr!=nil{
returnfmt.Errorf("pubsub: result.Get: %w",err)
}
fmt.Fprintf(w,"Published a message; msg ID: %v\n",id)
returnnil
}

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.


importcom.google.api.core.ApiFuture ;
importcom.google.api.gax.retrying.RetrySettings ;
importcom.google.cloud.pubsub.v1.Publisher ;
importcom.google.protobuf.ByteString ;
importcom.google.pubsub.v1.PubsubMessage ;
importcom.google.pubsub.v1.TopicName ;
importjava.io.IOException;
importjava.util.concurrent.ExecutionException;
importjava.util.concurrent.TimeUnit;
importorg.threeten.bp.Duration ;
publicclass PublishWithRetrySettingsExample{
publicstaticvoidmain(String...args)throwsException{
// TODO(developer): Replace these variables before running the sample.
StringprojectId="your-project-id";
StringtopicId="your-topic-id";
publishWithRetrySettingsExample(projectId,topicId);
}
publicstaticvoidpublishWithRetrySettingsExample(StringprojectId,StringtopicId)
throwsIOException,ExecutionException,InterruptedException{
TopicName topicName=TopicName .of(projectId,topicId);
Publisher publisher=null;
try{
// Retry settings control how the publisher handles retry-able failures
Duration initialRetryDelay=Duration .ofMillis(100);// default: 100 ms
doubleretryDelayMultiplier=2.0;// back off for repeated failures, default: 1.3
Duration maxRetryDelay=Duration .ofSeconds(60);// default : 60 seconds
Duration initialRpcTimeout=Duration .ofSeconds(1);// default: 5 seconds
doublerpcTimeoutMultiplier=1.0;// default: 1.0
Duration maxRpcTimeout=Duration .ofSeconds(600);// default: 600 seconds
Duration totalTimeout=Duration .ofSeconds(600);// default: 600 seconds
RetrySettings retrySettings=
RetrySettings .newBuilder()
.setInitialRetryDelay (initialRetryDelay)
.setRetryDelayMultiplier (retryDelayMultiplier)
.setMaxRetryDelay (maxRetryDelay)
.setInitialRpcTimeout (initialRpcTimeout)
.setRpcTimeoutMultiplier (rpcTimeoutMultiplier)
.setMaxRpcTimeout (maxRpcTimeout)
.setTotalTimeout (totalTimeout)
.build();
// Create a publisher instance with default settings bound to the topic
publisher=Publisher .newBuilder(topicName).setRetrySettings (retrySettings).build();
Stringmessage="first message";
ByteString data=ByteString .copyFromUtf8 (message);
PubsubMessage pubsubMessage=PubsubMessage .newBuilder().setData (data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String>messageIdFuture=publisher.publish (pubsubMessage);
StringmessageId=messageIdFuture.get();
System.out.println("Published a message with retry settings: "+messageId);
}finally{
if(publisher!=null){
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown ();
publisher.awaitTermination (1,TimeUnit.MINUTES);
}
}
}
}

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const{PubSub}=require('@google-cloud/pubsub');
asyncfunctionpublishWithRetrySettings(topicNameOrId,data){
constpubsubClient=newPubSub ();
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
//
// Reference this document to see the current defaults for publishing:
// https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
//
// Please note that _all_ items must be included when passing these settings to topic().
// Otherwise, unpredictable (incorrect) defaults may be assumed.
constretrySettings={
retryCodes:[
10,// 'ABORTED'
1,// 'CANCELLED',
4,// 'DEADLINE_EXCEEDED'
13,// 'INTERNAL'
8,// 'RESOURCE_EXHAUSTED'
14,// 'UNAVAILABLE'
2,// 'UNKNOWN'
],
backoffSettings:{
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis:100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier:4,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis:60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis:60000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier:1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis:60000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis:600000,
},
};
// Cache topic objects (publishers) and reuse them.
consttopic=pubsubClient.topic(topicNameOrId,{
gaxOpts:{
retry:retrySettings,
},
});
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
constdataBuffer=Buffer.from (data);
constmessageId=awaittopic.publishMessage ({data:dataBuffer});
console.log(`Message ${messageId} published.`);
}

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
import{PubSub}from'@google-cloud/pubsub';
asyncfunctionpublishWithRetrySettings(topicNameOrId:string,data:string){
constpubsubClient=newPubSub();
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
//
// Reference this document to see the current defaults for publishing:
// https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
//
// Please note that _all_ items must be included when passing these settings to topic().
// Otherwise, unpredictable (incorrect) defaults may be assumed.
constretrySettings={
retryCodes:[
10,// 'ABORTED'
1,// 'CANCELLED',
4,// 'DEADLINE_EXCEEDED'
13,// 'INTERNAL'
8,// 'RESOURCE_EXHAUSTED'
14,// 'UNAVAILABLE'
2,// 'UNKNOWN'
],
backoffSettings:{
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis:100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier:4,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis:60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis:60000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier:1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis:60000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis:600000,
},
};
// Cache topic objects (publishers) and reuse them.
consttopic=pubsubClient.topic(topicNameOrId,{
gaxOpts:{
retry:retrySettings,
},
});
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
constdataBuffer=Buffer.from(data);
constmessageId=awaittopic.publishMessage({data:dataBuffer});
console.log(`Message ${messageId} published.`);
}

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.

fromgoogleimport api_core
fromgoogle.cloudimport pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry .Retry(
 initial=0.250, # seconds (default: 0.1)
 maximum=90.0, # seconds (default: 60.0)
 multiplier=1.45, # default: 1.3
 deadline=300.0, # seconds (default: 60.0)
 predicate=api_core.retry .if_exception_type(
 api_core.exceptions.Aborted,
 api_core.exceptions.DeadlineExceeded,
 api_core.exceptions.InternalServerError,
 api_core.exceptions.ResourceExhausted,
 api_core.exceptions.ServiceUnavailable,
 api_core.exceptions.Unknown,
 api_core.exceptions.Cancelled,
 ),
)
publisher = pubsub_v1.PublisherClient ()
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
 data_str = f"Message number {n}"
 # Data must be a bytestring
 data = data_str.encode("utf-8")
 future = publisher.publish (topic=topic_path, data=data, retry=custom_retry)
 print(future.result())
print(f"Published messages with retry settings to {topic_path}.")

Retry requests with ordering keys

Assume you have a single publisher client. You are using the Pub/Sub client libraries to publish messages 1, 2, and 3 for the same ordering key A. Now, assume that the published response for message 1 is not received by the publisher client before the RPC deadline expires. Message 1 must be republished. The sequence of messages received by the subscriber client then becomes 1, 1, 2, and 3, if you assume message 2 is published only after message 1 gets completed successfully. Each published message has its own message ID. From the subscriber client's perspective, four messages were published, with the first two having identical content.

Retrying publish requests with ordering keys can also be complicated by batch settings. The client library batches messages together for more efficient publishing. Continue with the previous example and assume that messages 1 and 2 are batched together. This batch is sent to the server as a single request. If the server fails to return a response in time, the publisher client retries this batch of two messages. Therefore, it is possible that the subscriber client receives messages 1, 2, 1, 2, and 3. If you use a Pub/Sub client library for publishing messages in order and a publish operation fails, the service fails the publish operations for all remaining messages on the same ordering key. A publisher client can then decide to follow any one of the following operations:

  • Republish all the failed messages in order

  • Republish a subset of the failed messages in order

  • Publish a new set of messages

If a non-retryable error occurs, the client library doesn't publish the message and stops publishing other messages with the same ordering key. For example, when a publisher sends a message to a topic that doesn't exist, a non-retryable error occurs. To continue publishing messages with the same ordering key, call a method to resume publishing and then start publishing again.

The following sample shows you how to resume publishing messages with the same ordering key.

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespacepubsub=::google::cloud::pubsub;
using::google::cloud::future;
using::google::cloud::StatusOr;
[](pubsub::Publisherpublisher){
structSampleData{
std::stringordering_key;
std::stringdata;
}data[]={
{"key1","message1"},{"key2","message2"},{"key1","message3"},
{"key1","message4"},{"key1","message5"},
};
std::vector<future<void>>done;
for(auto&datum:data){
autoconst&da=datum;// workaround MSVC lambda capture confusion
autohandler=[da,publisher](future<StatusOr<std::string>>f)mutable{
autoconstmsg=da.ordering_key+"#"+da.data;
autoid=f.get();
if(!id){
std::cout << "An error has occurred publishing " << msg << "\n";
publisher.ResumePublish(da.ordering_key);
return;
}
std::cout << "Message " << msg << " published as id=" << *id << "\n";
};
done.push_back(
publisher
.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! ["+datum.data+"]")
.SetOrderingKey(datum.ordering_key)
.Build())
.then(handler));
}
publisher.Flush();
// Block until all the messages are published (optional)
for(auto&f:done)f.get();
}

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.


usingGoogle.Cloud.PubSub.V1 ;
usingSystem;
usingSystem.Collections.Generic;
usingSystem.Linq;
usingSystem.Threading;
usingSystem.Threading.Tasks;
publicclassResumePublishSample
{
publicasyncTask<int>PublishOrderedMessagesAsync(stringprojectId,stringtopicId,IEnumerable<(string,string)>keysAndMessages)
{
TopicName topicName=TopicName .FromProjectTopic (projectId,topicId);
varcustomSettings=newPublisherClient.Settings
{
EnableMessageOrdering=true
};
PublisherClient publisher=awaitnewPublisherClientBuilder
{
TopicName=topicName,
Settings=customSettings
}.BuildAsync();
intpublishedMessageCount=0;
varpublishTasks=keysAndMessages.Select(asynckeyAndMessage=>
{
try
{
stringmessage=awaitpublisher.PublishAsync(keyAndMessage.Item1,keyAndMessage.Item2);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(refpublishedMessageCount);
}
catch(Exceptionexception)
{
Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
publisher.ResumePublish(keyAndMessage.Item1);
}
});
awaitTask.WhenAll(publishTasks);
// PublisherClient instance should be shutdown after use.
// The TimeSpan specifies for how long to attempt to publish locally queued messages.
awaitpublisher.ShutdownAsync (TimeSpan.FromSeconds(15));
returnpublishedMessageCount;
}
}

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2. To see a list of v1 code samples, see the deprecated code samples.

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.

import(
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub/v2"
"google.golang.org/api/option"
)
funcresumePublishWithOrderingKey(wio.Writer,projectID,topicIDstring){
// projectID := "my-project-id"
// topicID := "my-topic"
ctx:=context.Background()
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
client,err:=pubsub.NewClient(ctx,projectID,
option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
iferr!=nil{
fmt.Fprintf(w,"pubsub.NewClient: %v",err)
return
}
deferclient.Close()
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
// Reuse this publisher for all publish calls to send messages in batches.
publisher:=client.Publisher(topicID)
publisher.EnableMessageOrdering=true
key:="some-ordering-key"
result:=publisher.Publish(ctx,&pubsub.Message{
Data:[]byte("some-message"),
OrderingKey:key,
})
_,err=result.Get(ctx)
iferr!=nil{
// Error handling code can be added here.
fmt.Printf("Failed to publish: %s\n",err)
// Resume publish on an ordering key that has had unrecoverable errors.
// After such an error publishes with this ordering key will fail
// until this method is called.
publisher.ResumePublish(key)
}
fmt.Fprint(w,"Published a message with ordering key successfully\n")
}

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.

importcom.google.api.core.ApiFuture ;
importcom.google.api.core.ApiFutureCallback ;
importcom.google.api.core.ApiFutures ;
importcom.google.api.gax.rpc.ApiException ;
importcom.google.cloud.pubsub.v1.Publisher ;
importcom.google.common.util.concurrent.MoreExecutors;
importcom.google.protobuf.ByteString ;
importcom.google.pubsub.v1.PubsubMessage ;
importcom.google.pubsub.v1.TopicName ;
importjava.io.IOException;
importjava.util.LinkedHashMap;
importjava.util.Map;
importjava.util.concurrent.TimeUnit;
publicclass ResumePublishWithOrderingKeys{
publicstaticvoidmain(String...args)throwsException{
// TODO(developer): Replace these variables before running the sample.
StringprojectId="your-project-id";
// Choose an existing topic.
StringtopicId="your-topic-id";
resumePublishWithOrderingKeysExample(projectId,topicId);
}
publicstaticvoidresumePublishWithOrderingKeysExample(StringprojectId,StringtopicId)
throwsIOException,InterruptedException{
TopicName topicName=TopicName .of(projectId,topicId);
// Create a publisher and set message ordering to true.
Publisher publisher=
Publisher .newBuilder(topicName)
.setEnableMessageOrdering(true)
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.build();
try{
Map<String,String>messages=newLinkedHashMap<String,String>();
messages.put("message1","key1");
messages.put("message2","key2");
messages.put("message3","key1");
messages.put("message4","key2");
for(Map.Entry<String,String>entry:messages.entrySet()){
ByteStringdata=ByteString.copyFromUtf8(entry.getKey());
PubsubMessagepubsubMessage=
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String>future=publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
newApiFutureCallback<String>(){
@Override
publicvoidonFailure(Throwablethrowable){
if(throwableinstanceofApiException){
ApiExceptionapiException=((ApiException)throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : "+pubsubMessage.getData());
// (Beta) Must call resumePublish to reset key and continue publishing with order.
publisher.resumePublish(pubsubMessage.getOrderingKey());
}
@Override
publicvoidonSuccess(StringmessageId){
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData()+" : "+messageId);
}
},
MoreExecutors.directExecutor());
}
}finally{
publisher.shutdown();
publisher.awaitTermination(1,TimeUnit.MINUTES);
}
}
}

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
const{PubSub}=require('@google-cloud/pubsub');
// Creates a client; cache this for further use
constpubSubClient=newPubSub ();
asyncfunctionresumePublish(topicNameOrId,data,orderingKey){
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
constdataBuffer=Buffer.from (data);
constpublishOptions={
messageOrdering:true,
};
// Cache topic objects (publishers) and reuse them.
//
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
// key are in the same region. For list of locational endpoints for Pub/Sub, see:
// https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
constpublisher=pubSubClient.topic(topicNameOrId,publishOptions);
// Publishes the message
try{
constmessage={
data:dataBuffer,
orderingKey:orderingKey,
};
constmessageId=awaitpublisher .publishMessage (message);
console.log(`Message ${messageId} published.`);
returnmessageId;
}catch(e){
console.log(`Could not publish: ${e}`);
publisher .resumePublishing (orderingKey);
returnnull;
}
}

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.

fromgoogle.cloudimport pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions (enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient (
 publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
 ("message1", "key1"),
 ("message2", "key2"),
 ("message3", "key1"),
 ("message4", "key2"),
]:
 # Data must be a bytestring
 data = message[0].encode("utf-8")
 ordering_key = message[1]
 # When you publish a message, the client returns a future.
 future = publisher.publish (topic_path, data=data, ordering_key=ordering_key)
 try:
 print(future.result())
 except RuntimeError:
 # Resume publish on an ordering key that has had unrecoverable errors.
 publisher.resume_publish (topic_path, ordering_key)
print(f"Resumed publishing messages with ordering keys to {topic_path}.")

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3. To see a list of Ruby v2 code samples, see the deprecated code samples.

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.

# topic_id = "your-topic-id"
pubsub=Google::Cloud::PubSub .new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
publisher=pubsub.publisher topic_id,async:{
max_bytes:1_000_000,
max_messages:20
}
publisher.enable_message_ordering!
10.timesdo|i|
publisher.publish_async "This is message ##{i}.",
ordering_key:"ordering-key"do|result|
ifresult.succeeded?
puts"Message ##{i} successfully published."
else
puts"Message ##{i} failed to publish"
# Allow publishing to continue on "ordering-key" after processing the
# failure.
publisher.resume_publish"ordering-key"
end
end
end
# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher .stop!

What's next

To learn how to configure advanced publishing options, see the following:

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025年10月30日 UTC.