⚡ Improve your application's performance by consuming your Symfony Messenger messages with Go.
- Consume your messages directly with Go code
- PostgreSQL support
- AMQP support
- Redis support
Install gosumer with Go
go get github.com/romaixn/gosumer
Add this to your config/packages/messenger.yaml:
framework: messenger: transports: go: # Add this new transport dsn: '%env(MESSENGER_TRANSPORT_DSN)%' serializer: 'messenger.transport.symfony_serializer' # Required, https://symfony.com/doc/current/messenger.html#serializing-messages options: use_notify: true check_delayed_interval: 60000 queue_name: go # Required, used to only get right messages in go side retry_strategy: max_retries: 3 multiplier: 2
Don't forget to specify in the routing part the message to process in Go
Create an env variable to create a custom queue (in this example go is the name of the queue):
RABBITMQ_GO_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/go
And use it in config/packages/messenger.yaml:
framework: messenger: transports: go: dsn: '%env(RABBITMQ_GO_TRANSPORT_DSN)%' serializer: 'messenger.transport.symfony_serializer' retry_strategy: max_retries: 3 multiplier: 2
Create an env variable for Redis:
REDIS_TRANSPORT_DSN=redis://localhost:6379/messages
Add the following to your config/packages/messenger.yaml:
framework: messenger: transports: async: dsn: "%env(MESSENGER_TRANSPORT_DSN)%" options: []
Make sure to specify the message routing in the routing section to process in Go.
For PostgreSQL:
database := gosumer.PgDatabase{ Host: "localhost", Port: 5432, User: "app", Password: "!ChangeMe!", Database: "app", TableName: "messenger_messages", }
If you are using a custom schema, you can specify it with backticks:
database := gosumer.PgDatabase{ Host: "localhost", Port: 5432, User: "app", Password: "!ChangeMe!", Database: "app", TableName: `"myschema"."messenger_messages"`, }
For RabbitMQ:
database := gosumer.RabbitMQ{ Host: "localhost", Port: nil, User: "guest", Password: "guest", Queue: "go", }
For Redis:
database := gosumer.Redis{ Host: "localhost", Port: 6379, User: "username", Password: "password", DB: 0, Channel: "channel_name", }
Call the Listen
// Define your own structure according to your message type Message struct { ID int `json:"id"` Number int `json:"number"` } err := gosumer.Listen(database, process, Message{}) if err != nil { log.Fatal(err) }
With the function to process your messages:
func process(message any, err chan error) { log.Printf("Message received: %v", message) // No error err <- nil // if there is an error, used to not delete message if an error occured // err <- errors.New("Error occured !") }