-
Notifications
You must be signed in to change notification settings - Fork 440
Add header support for kafka #955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add header support for kafka #955
Conversation
looks good to me. @Steveb-p wdyt?
I get the following (I guess) non-related error running the functional tests:
1) Enqueue\Dbal\Tests\Spec\Mysql\DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest::test
Failed asserting that two arrays are equal.
--- Expected
+++ Actual
@@ @@
Array (
0 => 'fooBody'
- 1 => 'barBody'
)
/mqdev/vendor/queue-interop/queue-spec/src/SubscriptionConsumerConsumeFromAllSubscribedQueuesSpec.php:63
I was sure I did open a PR earlier with it: #880. Closed it since a lot has changed.
However, I'd suggest getting the part with backward compatibility ported here?
// Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's. // Note: Requires phprdkafka >= 3.1.0 if (isset($kafkaMessage->headers)) { $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers)); }
This way messages produced by earlier versions of php-enqueue
would still be handled.
I'll give it a look after work, but it should be fine :)
So should I just copy over the code from your closed PR into this PR? Do we still need the segfault check?
// Note: Topic::producev method exists in phprdkafka > 3.1.0 // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version if (method_exists($topic, 'producev')) { // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=') && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING); } $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); } else { $topic->produce($partition, 0 /* must be 0 */, $payload, $key); }
So should I just copy over the code from your closed PR into this PR?
// Note: Topic::producev method exists in phprdkafka > 3.1.0 // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version if (method_exists($topic, 'producev')) { // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=') && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING); } $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); } else { $topic->produce($partition, 0 /* must be 0 */, $payload, $key); }
(削除) I'll be ok with enforcing higher level of phprdkafka/librdkafka instead and keeping your code that changes this to call producev
by default. (削除ここまで)
Yes actually. That would result in still allowing to work with lower phprdkafka versions, while using the new producev
method when available. There is a lot of recent contributions to phprdkafka (Thanks to Nick), but there are also reports about some segfaults happening sometimes, so I'm not keen to forcing usage of those versions yet (but that's only my opinion).
My original code should probably be changed to throw an Exception
or trigger a full error instead, because phprdkafka 3.1.0 will simply not producev
message properly with librdkafka 1.0.0
(segfault was guaranteed to happen). Or it should drop to produce
call instead. WDYT @TiMESPLiNTER ?
@Steveb-p how about dropping to produce and trigger a PHP warning in the 3.1.0/1.0.0 case?
I think we could include this in the next bugfix release for
0.9.x
? What would I need to do to include it there? Also make a PR against the0.9
branch?
0.9
branch afair is actually a "tag" for initial 0.9
release ;)
master
branch is the base for new releases, so it's fine as it is.
@TiMESPLiNTER would you mind adding
as well?
It would provide cross-compatibility with earlier versions (which might actually be important since Kafka can store all historical messages, right? 😄 )
And the inclusion of that code for produce
fallback means that version from ^3.0.3
to ^3.1.0
is no longer required.
@TiMESPLiNTER would you mind adding
as well?
This code is currently already in master from what I see. Just the producing functionality is currently missing header support.
@makasim IMO it's ready. As usual I'm not merging it since you're the owner 😈
When is the next released planned (including this "bug fix")?
When is the next released planned (including this "bug fix")?
@TiMESPLiNTER until @makasim creates a new version (which should be soonTM) you can use this functionality of composer to force it to use your fork:
https://getcomposer.org/doc/05-repositories.md#loading-a-package-from-a-vcs-repository
There are a few use cases for this. The most common one is maintaining your own fork of a third party library. If you are using a certain library for your project and you decide to change something in the library, you will want your project to use the patched version. If the library is on GitHub (this is the case most of the time), you can simply fork it there and push your changes to your fork. After that you update the project's composer.json. All you have to do is add your fork as a repository and update the version constraint to point to your custom branch. In composer.json, you should prefix your custom branch name with "dev-". For version constraint naming conventions see Libraries for more information.
So, in your case it might be:
{ "repositories": [ { "type": "vcs", "url": "https://github.com/TiMESPLIiNTER/enqueue/rdkafka" } ], "require": { "enqueue/rdkafka": "^0.9.13 | dev->>BRANCH-NAME<<" } }
Adding ^0.9.13
will allow you to update to a newer version once it's available (by using composer update enqueue/rdkafka
), if you have prefer-stable:true
in composer.json
config (https://getcomposer.org/doc/04-schema.md#prefer-stable)
Ths fix already in enqueue/rdkafka, you require dev version for now.
I'll do release when I have time
Okay, just was wondering if something is planned. All good I'll use dev-master
in the meantime.
This bumps the dependency on rdkafka extension from version
3.0.3
to3.1.0
.