Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit b11fd8a

Browse files
authored
Merge pull request #955 from TiMESPLiNTER/feat/kafka-header-support
Add header support for kafka
2 parents 599ed87 + 2ae4655 commit b11fd8a

File tree

3 files changed

+41
-7
lines changed

3 files changed

+41
-7
lines changed

‎pkg/rdkafka/RdKafkaContext.php‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,18 @@ public function purgeQueue(Queue $queue): void
151151
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
152152
}
153153

154+
public static function getLibrdKafkaVersion(): string
155+
{
156+
if (!defined('RD_KAFKA_VERSION')) {
157+
throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed');
158+
}
159+
$major = (RD_KAFKA_VERSION & 0xFF000000) >> 24;
160+
$minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16;
161+
$patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8;
162+
163+
return "$major.$minor.$patch";
164+
}
165+
154166
private function getProducer(): VendorProducer
155167
{
156168
if (null === $this->producer) {

‎pkg/rdkafka/RdKafkaProducer.php‎

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,25 @@ public function send(Destination $destination, Message $message): void
4242
$key = $message->getKey() ?: $destination->getKey() ?: null;
4343

4444
$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
45+
46+
// Note: Topic::producev method exists in phprdkafka > 3.1.0
47+
// Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
48+
if (method_exists($topic, 'producev')) {
49+
// Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka >= 1.0.0 causing segfault
50+
if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
51+
&& version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
52+
trigger_error(
53+
'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. '.
54+
'Falling back to `produce` (without message headers) instead.',
55+
E_USER_WARNING
56+
);
57+
} else {
58+
$topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
59+
60+
return;
61+
}
62+
}
63+
4564
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
4665
}
4766

‎pkg/rdkafka/Tests/RdKafkaProducerTest.php‎

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,20 @@ public function testThrowIfMessageInvalid()
4545

4646
public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube()
4747
{
48-
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
48+
$messageHeaders = ['bar' => 'barVal'];
49+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders);
4950
$message->setKey('key');
5051

5152
$kafkaTopic = $this->createKafkaTopicMock();
5253
$kafkaTopic
5354
->expects($this->once())
54-
->method('produce')
55+
->method('producev')
5556
->with(
5657
RD_KAFKA_PARTITION_UA,
5758
0,
5859
'theSerializedMessage',
59-
'key'
60+
'key',
61+
$messageHeaders
6062
)
6163
;
6264

@@ -87,7 +89,7 @@ public function testShouldPassNullAsTopicConfigIfNotSetOnTopic()
8789
$kafkaTopic = $this->createKafkaTopicMock();
8890
$kafkaTopic
8991
->expects($this->once())
90-
->method('produce')
92+
->method('producev')
9193
;
9294

9395
$kafkaProducer = $this->createKafkaProducerMock();
@@ -123,7 +125,7 @@ public function testShouldPassCustomConfAsTopicConfigIfSetOnTopic()
123125
$kafkaTopic = $this->createKafkaTopicMock();
124126
$kafkaTopic
125127
->expects($this->once())
126-
->method('produce')
128+
->method('producev')
127129
;
128130

129131
$kafkaProducer = $this->createKafkaProducerMock();
@@ -165,13 +167,14 @@ public function testShouldAllowGetPreviouslySetSerializer()
165167

166168
public function testShouldAllowSerializersToSerializeKeys()
167169
{
168-
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
170+
$messageHeaders = ['bar' => 'barVal'];
171+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders);
169172
$message->setKey('key');
170173

171174
$kafkaTopic = $this->createKafkaTopicMock();
172175
$kafkaTopic
173176
->expects($this->once())
174-
->method('produce')
177+
->method('producev')
175178
->with(
176179
RD_KAFKA_PARTITION_UA,
177180
0,

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /