Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit b656167

Browse files
author
Pavel Batanov
authored
Merge pull request #2 from lamoda/feature/rdkafka-4
Support RDKafka 4
2 parents 34215c6 + 005b531 commit b656167

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

‎composer.json‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
],
1313
"require": {
1414
"php": ">=7.1",
15-
"codeception/codeception": "~2.5"
15+
"ext-rdkafka": "^3 || ^4",
16+
"codeception/codeception": "^2.5 | ^3.0 | ^4.0"
1617
},
1718
"autoload": {
1819
"psr-4": {

‎src/Extension/KafkaModule.php‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Codeception\Module;
88
use Exception;
9+
use Lamoda\Codeception\Extension\MessageSerializer\ArrayMessageSerializer;
910
use Lamoda\Codeception\Extension\MessageSerializer\MessageSerializerInterface;
1011
use RdKafka\Conf;
1112
use RdKafka\Consumer;
@@ -17,6 +18,7 @@
1718
class KafkaModule extends Module
1819
{
1920
protected const DEFAULT_PARTITION = 0;
21+
protected const FLUSH_TIMEOUT_MS = 3000;
2022

2123
/**
2224
* @var MessageSerializerInterface
@@ -83,6 +85,10 @@ public function putMessageInTopic(string $topicName, string $message, ?int $part
8385
$topic = $producer->newTopic($topicName, $this->topicConf);
8486

8587
$topic->produce($partition ?? static::DEFAULT_PARTITION, 0, $message);
88+
89+
if (method_exists($producer, 'flush')) {
90+
$producer->flush(self::FLUSH_TIMEOUT_MS);
91+
}
8692
}
8793

8894
public function putMessageListInTopic(string $topicName, array $messages, ?int $partition = null): void

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /