Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 81cfb8d

Browse files
committed
Fix header conversion in eachBatch
1 parent a717346 commit 81cfb8d

File tree

2 files changed

+71
-27
lines changed

2 files changed

+71
-27
lines changed

‎lib/kafkajs/_consumer.js‎

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -645,22 +645,15 @@ class Consumer {
645645
}
646646

647647
/**
648-
* Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback.
649-
* @param {import("../..").Message} message
650-
* @returns {import("../../types/kafkajs").EachMessagePayload}
648+
* Converts headers returned by node-rdkafka into a format that can be used by the eachMessage/eachBatch callback.
649+
* @param {import("../..").MessageHeader[] | undefined} messageHeaders
650+
* @returns {import("../../types/kafkajs").IHeaders}
651651
*/
652-
#createPayload(message) {
653-
let key = message.key;
654-
if (typeof key === 'string') {
655-
key = Buffer.from(key);
656-
}
657-
658-
let timestamp = message.timestamp ? String(message.timestamp) : '';
659-
652+
#createHeaders(messageHeaders) {
660653
let headers;
661-
if (message.headers) {
654+
if (messageHeaders) {
662655
headers = {};
663-
for (const header of message.headers) {
656+
for (const header of messageHeaders) {
664657
for (const [key, value] of Object.entries(header)) {
665658
if (!Object.hasOwn(headers, key)) {
666659
headers[key] = value;
@@ -672,6 +665,22 @@ class Consumer {
672665
}
673666
}
674667
}
668+
return headers;
669+
}
670+
671+
/**
672+
* Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback.
673+
* @param {import("../..").Message} message
674+
* @returns {import("../../types/kafkajs").EachMessagePayload}
675+
*/
676+
#createPayload(message) {
677+
let key = message.key;
678+
if (typeof key === 'string') {
679+
key = Buffer.from(key);
680+
}
681+
682+
let timestamp = message.timestamp ? String(message.timestamp) : '';
683+
const headers = this.#createHeaders(message.headers);
675684

676685
return {
677686
topic: message.topic,
@@ -788,20 +797,7 @@ class Consumer {
788797
}
789798

790799
let timestamp = message.timestamp ? String(message.timestamp) : '';
791-
792-
let headers;
793-
if (message.headers) {
794-
headers = {};
795-
for (const [key, value] of Object.entries(message.headers)) {
796-
if (!Object.hasOwn(headers, key)) {
797-
headers[key] = value;
798-
} else if (headers[key].constructor === Array) {
799-
headers[key].push(value);
800-
} else {
801-
headers[key] = [headers[key], value];
802-
}
803-
}
804-
}
800+
const headers = this.#createHeaders(message.headers);
805801

806802
const messageConverted = {
807803
key,

‎test/promisified/consumer/consumeMessages.spec.js‎

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,54 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
134134
);
135135
});
136136

137+
it('consume batch of messages with headers', async () => {
138+
await consumer.connect();
139+
await producer.connect();
140+
await consumer.subscribe({ topic: topicName });
141+
142+
const messagesConsumed = [];
143+
consumer.run({
144+
partitionsConsumedConcurrently,
145+
eachBatch: async event => messagesConsumed.push(event)
146+
});
147+
148+
const messages = [{
149+
value: `value-${secureRandom}`,
150+
headers: {
151+
'header-1': 'value-1',
152+
'header-2': 'value-2',
153+
'header-3': ['value-3-1', 'value-3-2', Buffer.from([1, 0, 1, 0, 1])],
154+
'header-4': Buffer.from([1, 0, 1, 0, 1]),
155+
},
156+
partition: 0,
157+
}];
158+
159+
await producer.send({ topic: topicName, messages });
160+
await waitForMessages(messagesConsumed, { number: messages.length });
161+
162+
expect(messagesConsumed[0]).toEqual(
163+
expect.objectContaining({
164+
batch: expect.objectContaining({
165+
topic: topicName,
166+
partition: 0,
167+
messages: [
168+
expect.objectContaining({
169+
value: Buffer.from(messages[0].value),
170+
offset: '0',
171+
headers: {
172+
// Headers are always returned as Buffers from the broker.
173+
'header-1': Buffer.from('value-1'),
174+
'header-2': Buffer.from('value-2'),
175+
'header-3': [Buffer.from('value-3-1'), Buffer.from('value-3-2'), Buffer.from([1, 0, 1, 0, 1])],
176+
'header-4': Buffer.from([1, 0, 1, 0, 1]),
177+
}
178+
}),
179+
]
180+
}),
181+
})
182+
);
183+
});
184+
137185
it.each([[true], [false]])('consumes messages using eachBatch - isAutoResolve: %s', async (isAutoResolve) => {
138186
await consumer.connect();
139187
await producer.connect();

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /