Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

A sample Symfony project implementing messenger with apache kafka service. This repository contains a symfony project, kafka and zookeeper servers. Services run on docker.

License

Notifications You must be signed in to change notification settings

symfony-examples/messenger-kafka

Repository files navigation

Symfony & Kafka

SF Messenger Kafka CI SF Messenger Kafka Security SF Messenger Kafka Packages retention policy

Screenshot 2024εΉ΄03月25ζ—₯ at 15 26 41

About

A sample Symfony project implementing messenger component with apache kafka service.
This repository contains a symfony project, kafka and zookeeper servers.
Services run on docker.
In this example we have configured messenger component to :

  • Publish message into kafka topic
  • Consume messages from kafka topics

By default, kafka transport is not implemented by messenger that's why we created custom transport in kafka directory.

Requirements

  • git
  • docker
  • docker-compose
  • make

How to

Clone the project

git clone https://github.com/symfony-examples/messenger-kafka.git

Installation

This command will create all services and the kafka topic

make install-local

Enjoy ! πŸ₯³

Check if all is done

Producer

make console app:messenger:producer reference 2000

reference and 2000 are required arguments, you can replace them by other values
This command will send a message to kafka topic
The topic is defined in config/packages/messenger.yaml producer_topic

framework:
 messenger:
 transports:
 order_transport:
 dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
 options:
 ...
 producer_topic: 'order_topic_test'

Consumer

make console messenger:consume order_transport

order_transport is the transport name defined in config/packages/messenger.yaml
This command will consume messages from the kafka topic define in config/packages/messenger.yaml consumer_topics

framework:
 messenger:
 transports:
 order_transport:
 dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
 options:
 ...
 consumer_topics:
 - 'order_topic_test'

Messages will be handled by App\Messenger\Handler\OrderPaidHandler
In this example we send an InvoiceCreatedMessage for each order paid, you can update this implementation and put your custom logic here.

// App\Messenger\Handler\OrderPaidHandler
public function __invoke(OrderPaidMessage $message): void
{
 // implement logic here
}

Setup in your symfony project

PHP extension

rdkafka extension must be installed.

; kafka.ini
extension=rdkafka.so
## SETUP RDKAFKA EXTESIONS
RUN set -xe \
 && apk add --no-cache --update --virtual .phpize-deps $PHPIZE_DEPS \
 librdkafka-dev \
 && pecl install rdkafka
COPY ./.docker/php/kafka.ini $PHP_INI_DIR/conf.d/

Check if the extension is well installed

php --ri rdkafka

Config env

Add environment variables in .env file:

# transport dsn must start with kafka://
MESSENGER_TRANSPORT_DSN=kafka://
# kafka broker list separate with comma (exp: kafka-1:9092,kafka-2:9092)
KAFKA_BROKERS=kafka:9092

Config messenger

Configure your transport

framework:
 messenger:
 transports:
 order_transport:
 dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
 options:
 metadata.broker.list: '%env(KAFKA_BROKERS)%'
 group.id: 'my-group-id'
 auto.offset.reset: 'earliest'
 # you can add here any rdkafka option you need
 # https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
 ...
 consumer_topics:
 - 'order_topic_test'

Setup transport

make console messenger:setup-transport

Enjoy ! πŸ₯³

References

About

A sample Symfony project implementing messenger with apache kafka service. This repository contains a symfony project, kafka and zookeeper servers. Services run on docker.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

AltStyle γ«γ‚ˆγ£γ¦ε€‰ζ›γ•γ‚ŒγŸγƒšγƒΌγ‚Έ (->γ‚ͺγƒͺγ‚ΈγƒŠγƒ«) /