Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Move activation of dispatchers into event loop thread #190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
milindl merged 9 commits into master from dev_fix_wrong_thread_issue
Dec 12, 2024

Conversation

@milindl
Copy link
Contributor

@milindl milindl commented Nov 27, 2024
edited
Loading

  1. Reduce test flakiness by adding some time for metadata to propagate, adding some 'sleep's, and disabling two tests which are flaky for librdkafka-reasons. Change group desc. test to account for the possibility of empty->dead state transition.
  2. Move the event-loop-thread specific stuff onto that thread.
  3. See if this makes tests run on semaphore consistently.

@milindl milindl requested review from a team as code owners November 27, 2024 09:40
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@milindl milindl changed the title (削除) [wip] Move activation of dispatchers into event loop thread (削除ここまで) (追記) Move activation of dispatchers into event loop thread (追記ここまで) Nov 28, 2024
Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix Milind and thanks @trevorr for finding it!
I request just a few changes to solve the flakiness in a different way.

- docker compose up -d && sleep 30
- export NODE_OPTIONS='--max-old-space-size=1536'
- npx jest --forceExit --no-colors --ci test/promisified/admin/delete_groups.spec.js test/promisified/consumer/pause.spec.js
- npx jest --forceExit --no-colors --ci test/promisified/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If --forceExit still needed, are there unhandled promises?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, sorry, left it in by accident.


// Depending on the environment of the test run, the group might transition into
// the DEAD state, so allow for both possibilities.
expect(describeGroupsResult.groups[0].state === ConsumerGroupStates.EMPTY || describeGroupsResult.groups[0].state === ConsumerGroupStates.DEAD).toBeTruthy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this won't be needed if some offsets are consumed and committed by the consumer. In that case even if there's a coordinator change, the group would be loaded and be EMPTY instead of DEAD.

milindl reacted with thumbs up emoji
* to be small and we get multiple partitions in the cache at once.
* This is to reduce flakiness. */
producer = createProducer({}, {
'linger.ms': 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ensured better with:

Suggested change
'linger.ms': 1,
'batch.num.messages': 1,

milindl reacted with thumbs up emoji
Comment on lines 431 to 433
'fetch.message.max.bytes': 1,
'fetch.max.bytes': 1000,
'message.max.bytes': 1000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave only this one to get a single batch

Suggested change
'fetch.message.max.bytes': 1,
'fetch.max.bytes': 1000,
'message.max.bytes': 1000,
'fetch.message.max.bytes': 1,

milindl reacted with thumbs up emoji
Comment on lines 405 to 410
if (partitionsConsumedConcurrently >= 2) {
/* Given how librdkafka merges partition queues, it's very unlikely that
* we get *three* partitions in the cache at one time given how we've produced
* the messages. So just skip it, it'll be very flaky otherwise. */
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can verify it with partitionsConsumedConcurrently >= 2
If you set

 const messagesConsumed = [];
 const expectedMaxConcurrentWorkers = Math.min(partitionsConsumedConcurrentlyDiff, partitions);
 const maxConcurrentWorkersReached = new DeferredPromise();

and then

 eachMessage: async event => {
 inProgress++;
 messagesConsumed.push(event);
 inProgressMaxValue = Math.max(inProgress, inProgressMaxValue);
 if (inProgressMaxValue >= expectedMaxConcurrentWorkers) {
 maxConcurrentWorkersReached.resolve();
 } else if (messagesConsumed.length > 30) {
 await sleep(1000);
 }
 inProgress--;
 },

and finally

 await maxConcurrentWorkersReached;
 expect(inProgressMaxValue).toBe(expectedMaxConcurrentWorkers);

the sleep inside the invocation makes sure that worker invocations can overlap and reach that value. First messages are skipped as there's the exponential cache growth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got good results on making this change but I changed messagesConsumed.length > 30 to messagesConsumed.length > 2048. As it will make sure we've completely maxed out the cache capacity by this time.

emasab reacted with thumbs up emoji
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeferredPromise can be imported and exported from here for usage from rest of the tests.

milindl reacted with thumbs up emoji
if (debug) {
common['debug'] = debug;
} else { /* Turn off info logging unless specifically asked for, otherwise stdout gets very crowded. */
common['log_level'] = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the logs with level <= 5 (NOTICE) . Otherwise we could not see some error or warning. If there are expected errors in the tests a custom logger could be added later to assert their message and avoid logging it.

milindl reacted with thumbs up emoji
isSimpleConsumerGroup: false,
protocolType: 'consumer',
state: ConsumerGroupStates.EMPTY,
isSimpleConsumerGroup: expect.any(Boolean),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it works, this change could be reverted

milindl reacted with thumbs up emoji
Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@milindl milindl merged commit 7ec8cda into master Dec 12, 2024
2 checks passed
@milindl milindl deleted the dev_fix_wrong_thread_issue branch December 12, 2024 13:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

@emasab emasab emasab approved these changes

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

AltStyle によって変換されたページ (->オリジナル) /