I've been working on a Google Pub/Sub client library for golang. Fairly new to golang and my intention is to learn the language. Not really sure what are the best practices, aiming to learn them over a period of time.
Let me get straight into what the library is trying to do:
- Create a topic.
- Create a subscription.
- Publish messages to a topic.
- Use a pull subscriber to output individual topic messages.
Code
// Creates a client, and exposes deleteTopic, topicExists and createSubscription though the client
package pubsubclient
import (
"context"
"log"
"time"
"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"
)
type pubSubClient struct {
psclient *pubsub.Client
}
// getClient creates a pubsub client
func getClient(projectID string) (*pubSubClient, error) {
client, err := pubsub.NewClient(context.Background(), projectID)
if err != nil {
log.Printf("Error when creating pubsub client. Err: %v", err)
return nil, err
}
return &pubSubClient{psclient: client}, nil
}
// topicExists checks if a given topic exists
func (client *pubSubClient) topicExists(topicName string) (bool, error) {
topic := client.psclient.Topic(topicName)
return topic.Exists(context.Background())
}
// createTopic creates a topic if a topic name does not exist or returns one
// if it is already present
func (client *pubSubClient) createTopic(topicName string) (*pubsub.Topic, error) {
topicExists, err := client.topicExists(topicName)
if err != nil {
log.Printf("Could not check if topic exists. Error: %+v", err)
return nil, err
}
var topic *pubsub.Topic
if !topicExists {
topic, err = client.psclient.CreateTopic(context.Background(), topicName)
if err != nil {
log.Printf("Could not create topic. Err: %+v", err)
return nil, err
}
} else {
topic = client.psclient.Topic(topicName)
}
return topic, nil
}
// deleteTopic Deletes a topic
func (client *pubSubClient) deleteTopic(topicName string) error {
return client.psclient.Topic(topicName).Delete(context.Background())
}
// createSubscription creates the subscription to a topic
func (client *pubSubClient) createSubscription(subscriptionName string, topic *pubsub.Topic) (*pubsub.Subscription, error) {
subscription := client.psclient.Subscription(subscriptionName)
subscriptionExists, err := subscription.Exists(context.Background())
if err != nil {
log.Printf("Could not check if subscription %s exists. Err: %v", subscriptionName, err)
return nil, err
}
if !subscriptionExists {
cfg := pubsub.SubscriptionConfig{
Topic: topic,
// The subscriber has a configurable, limited amount of time -- known as the ackDeadline -- to acknowledge
// the outstanding message. Once the deadline passes, the message is no longer considered outstanding, and
// Cloud Pub/Sub will attempt to redeliver the message.
AckDeadline: 60 * time.Second,
}
subscription, err = client.psclient.CreateSubscription(context.Background(), subscriptionName, cfg)
if err != nil {
log.Printf("Could not create subscription %s. Err: %v", subscriptionName, err)
return nil, err
}
subscription.ReceiveSettings = pubsub.ReceiveSettings{
// This is the maximum amount of messages that are allowed to be processed by the callback function at a time.
// Once this limit is reached, the client waits for messages to be acked or nacked by the callback before
// requesting more messages from the server.
MaxOutstandingMessages: 100,
// This is the maximum amount of time that the client will extend a message's deadline. This value should be
// set as high as messages are expected to be processed, plus some buffer.
MaxExtension: 10 * time.Second,
}
}
return subscription, nil
}
Here is the publisher code
package pubsubclient
import (
"context"
"encoding/json"
"cloud.google.com/go/pubsub"
)
// Publisher contract to be returned to the consumer
type Publisher struct {
topic *pubsub.Topic
}
// PublisherConfig to be provided by the consumer.
type PublisherConfig struct {
ProjectID string
TopicName string
}
// GetPublisher gives a publisher
func GetPublisher(config PublisherConfig) (*Publisher, error) {
client, err := getClient(config.ProjectID)
if err != nil {
return nil, err
}
topic, err := client.createTopic(config.TopicName)
if err != nil {
return nil, err
}
return &Publisher{
topic: topic,
}, nil
}
// Publish message to pubsub
func (publisher *Publisher) Publish(payload interface{}) (string, error) {
data, err := json.Marshal(payload)
if err != nil {
return ``, err
}
message := &pubsub.Message{
Data: data,
}
response := publisher.topic.Publish(context.Background(), message)
return response.Get(context.Background())
}
Here is the Subscriber code
package pubsubclient
import (
"context"
"log"
"sync"
"cloud.google.com/go/pubsub"
)
// SubscribeMessageHandler that handles the message
type SubscribeMessageHandler func(chan *pubsub.Message)
// ErrorHandler that logs the error received while reading a message
type ErrorHandler func(error)
// SubscriberConfig subscriber config
type SubscriberConfig struct {
ProjectID string
TopicName string
SubscriptionName string
ErrorHandler ErrorHandler
Handle SubscribeMessageHandler
}
// Subscriber subscribe to a topic and pass each message to the
// handler function
type Subscriber struct {
topic *pubsub.Topic
subscription *pubsub.Subscription
errorHandler ErrorHandler
handle SubscribeMessageHandler
cancel func()
}
// CreateSubscription creates a subscription
func CreateSubscription(config SubscriberConfig) (*Subscriber, error) {
client, err := getClient(config.ProjectID)
if err != nil {
return nil, err
}
topic, err := client.createTopic(config.TopicName)
if err != nil {
return nil, err
}
subscription, err := client.createSubscription(config.SubscriptionName, topic)
if err != nil {
return nil, err
}
return &Subscriber{
topic: topic,
subscription: subscription,
errorHandler: config.ErrorHandler,
handle: config.Handle,
}, nil
}
// Process will start pulling from the pubsub. The process accepts a waitgroup as
// it will be easier to orchestrate a use case where one application needs
// to subscribe to more than one topic
func (subscriber *Subscriber) Process(wg *sync.WaitGroup) {
log.Printf("Starting a Subscriber on topic %s", subscriber.topic.String())
output := make(chan *pubsub.Message)
go func(subscriber *Subscriber, output chan *pubsub.Message) {
defer close(output)
ctx := context.Background()
ctx, subscriber.cancel = context.WithCancel(ctx)
err := subscriber.subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
output <- msg
})
if err != nil {
// The wait group is stopped or marked done when an error is encountered
subscriber.errorHandler(err)
subscriber.Stop()
wg.Done()
}
}(subscriber, output)
subscriber.handle(output)
}
// Stop the subscriber, closing the channel that was returned by Start.
func (subscriber *Subscriber) Stop() {
if subscriber.cancel != nil {
log.Print("Stopped the subscriber")
subscriber.cancel()
}
}
If this is a very big code review I will break it down. My main question is in the Process()
method of the subscriber. Right now this method accepts a waitGroup
and not really sure if that is a good design. To just show an example of how I envision to use the method:
// Process will start pulling from the pubsub. The process accepts a waitgroup as
// it will be easier for us to orchestrate a use case where one application needs
// more than one subscriber
var wg sync.WaitGroup
wg.Add(1)
go subscriber.Process(&wg)
publishMessages(publisher)
wg.Wait()
Is that the right way to design? Any other good design patterns that I might need to follow? Please let me know.
-
\$\begingroup\$ Cross posted to reddit: Go code review and feedback for pubsub client. \$\endgroup\$peterSO– peterSO2019年08月21日 23:40:57 +00:00Commented Aug 21, 2019 at 23:40
1 Answer 1
getClient
, I'm pretty sure you should pass through thecontext
object from the outside and not simply create a dummy withBackground
. Also, at some point doing both logging of an error and passing it back will bite you because you're going to end up with two or more copies of the same error (unless you're 100% diligent about only logging it at the origin).- Same
context
comment goes for all other methods actually. As a user of the library I want to pass in my context, otherwise there's little point to it. - The configurations in
createSubscription
look like they should be coming from the outside, that is, have defaults, but let the user override them. The comments are great though if they're not explained as part of thepubsub
library already. - In
Process
, the anonymous function doesn't have to have parameters, it'll simply capture the values ofsubscriber
andoutput
automatically.
I'd say the wait group is fine? Assuming that the Stop
call will
cause the appropriate error that shuts down the goroutine running
Process
. What's a bit vague to me is if it's intentional that the
errorHandler
is always called, even when the subscriber is terminated
via Stop
? It's not the worst of problems of course.