Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add header support for kafka #955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
makasim merged 6 commits into php-enqueue:master from TiMESPLiNTER:feat/kafka-header-support
Sep 25, 2019

Conversation

Copy link
Contributor

@TiMESPLiNTER TiMESPLiNTER commented Sep 25, 2019

This bumps the dependency on rdkafka extension from version 3.0.3 to 3.1.0.

Copy link
Member

makasim commented Sep 25, 2019

looks good to me. @Steveb-p wdyt?

Copy link
Contributor Author

I get the following (I guess) non-related error running the functional tests:

1) Enqueue\Dbal\Tests\Spec\Mysql\DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest::test
Failed asserting that two arrays are equal.
--- Expected
+++ Actual
@@ @@
 Array (
 0 => 'fooBody'
- 1 => 'barBody'
 )
/mqdev/vendor/queue-interop/queue-spec/src/SubscriptionConsumerConsumeFromAllSubscribedQueuesSpec.php:63

Copy link
Contributor

I was sure I did open a PR earlier with it: #880. Closed it since a lot has changed.

However, I'd suggest getting the part with backward compatibility ported here?

// Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's.
// Note: Requires phprdkafka >= 3.1.0
if (isset($kafkaMessage->headers)) {
 $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers));
}

This way messages produced by earlier versions of php-enqueue would still be handled.

I'll give it a look after work, but it should be fine :)

Copy link
Contributor Author

TiMESPLiNTER commented Sep 25, 2019
edited
Loading

So should I just copy over the code from your closed PR into this PR? Do we still need the segfault check?

 // Note: Topic::producev method exists in phprdkafka > 3.1.0
 // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
 if (method_exists($topic, 'producev')) {
 // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault
 if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
 && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
 trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING);
 }
 $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
 } else {
 $topic->produce($partition, 0 /* must be 0 */, $payload, $key);
 }

Copy link
Contributor

So should I just copy over the code from your closed PR into this PR?

 // Note: Topic::producev method exists in phprdkafka > 3.1.0
 // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
 if (method_exists($topic, 'producev')) {
 // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault
 if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
 && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
 trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING);
 }
 $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
 } else {
 $topic->produce($partition, 0 /* must be 0 */, $payload, $key);
 }

(削除) I'll be ok with enforcing higher level of phprdkafka/librdkafka instead and keeping your code that changes this to call producev by default. (削除ここまで)

Yes actually. That would result in still allowing to work with lower phprdkafka versions, while using the new producev method when available. There is a lot of recent contributions to phprdkafka (Thanks to Nick), but there are also reports about some segfaults happening sometimes, so I'm not keen to forcing usage of those versions yet (but that's only my opinion).

My original code should probably be changed to throw an Exception or trigger a full error instead, because phprdkafka 3.1.0 will simply not producev message properly with librdkafka 1.0.0 (segfault was guaranteed to happen). Or it should drop to produce call instead. WDYT @TiMESPLiNTER ?

Copy link
Contributor Author

@Steveb-p how about dropping to produce and trigger a PHP warning in the 3.1.0/1.0.0 case?

Copy link
Contributor

Copy link
Contributor Author

TiMESPLiNTER commented Sep 25, 2019
edited
Loading

@Steveb-p @makasim I added the functionality as discussed. Please have a look over the code again if you find some time. 😃

I think we could include this in the next bugfix release for 0.9.x? What would I need to do to include it there? Also make a PR against the 0.9 branch?

Copy link
Contributor

I think we could include this in the next bugfix release for 0.9.x? What would I need to do to include it there? Also make a PR against the 0.9 branch?

0.9 branch afair is actually a "tag" for initial 0.9 release ;)

master branch is the base for new releases, so it's fine as it is.

Copy link
Contributor

@TiMESPLiNTER would you mind adding

https://github.com/php-enqueue/enqueue-dev/pull/880/files#diff-30b559bffa4a02ebf46fbef2767f7a01R175-R180

as well?

It would provide cross-compatibility with earlier versions (which might actually be important since Kafka can store all historical messages, right? 😄 )

And the inclusion of that code for produce fallback means that version from ^3.0.3 to ^3.1.0 is no longer required.

Copy link
Contributor Author

@TiMESPLiNTER would you mind adding

https://github.com/php-enqueue/enqueue-dev/pull/880/files#diff-30b559bffa4a02ebf46fbef2767f7a01R175-R180

as well?

This code is currently already in master from what I see. Just the producing functionality is currently missing header support.

Copy link
Contributor

@makasim IMO it's ready. As usual I'm not merging it since you're the owner 😈

@makasim makasim merged commit b11fd8a into php-enqueue:master Sep 25, 2019
@TiMESPLiNTER TiMESPLiNTER deleted the feat/kafka-header-support branch September 25, 2019 13:39
Copy link
Contributor Author

When is the next released planned (including this "bug fix")?

Copy link
Contributor

Steveb-p commented Sep 26, 2019
edited
Loading

When is the next released planned (including this "bug fix")?

@TiMESPLiNTER until @makasim creates a new version (which should be soonTM) you can use this functionality of composer to force it to use your fork:
https://getcomposer.org/doc/05-repositories.md#loading-a-package-from-a-vcs-repository

There are a few use cases for this. The most common one is maintaining your own fork of a third party library. If you are using a certain library for your project and you decide to change something in the library, you will want your project to use the patched version. If the library is on GitHub (this is the case most of the time), you can simply fork it there and push your changes to your fork. After that you update the project's composer.json. All you have to do is add your fork as a repository and update the version constraint to point to your custom branch. In composer.json, you should prefix your custom branch name with "dev-". For version constraint naming conventions see Libraries for more information.

So, in your case it might be:

{
 "repositories": [
 {
 "type": "vcs",
 "url": "https://github.com/TiMESPLIiNTER/enqueue/rdkafka"
 }
 ],
 "require": {
 "enqueue/rdkafka": "^0.9.13 | dev->>BRANCH-NAME<<"
 }
}

Adding ^0.9.13 will allow you to update to a newer version once it's available (by using composer update enqueue/rdkafka), if you have prefer-stable:true in composer.json config (https://getcomposer.org/doc/04-schema.md#prefer-stable)

Copy link
Member

makasim commented Sep 26, 2019

Ths fix already in enqueue/rdkafka, you require dev version for now.

I'll do release when I have time

Steveb-p reacted with thumbs up emoji

Copy link
Contributor Author

Okay, just was wondering if something is planned. All good I'll use dev-master in the meantime.

Steveb-p reacted with thumbs up emoji

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Reviewers

@Steveb-p Steveb-p Steveb-p approved these changes

Assignees
No one assigned
Labels
None yet
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

AltStyle によって変換されたページ (->オリジナル) /