[フレーム]

Class: Kafka::Consumer

Inherits:
Object
  • Object
  • Kafka::Consumer
show all
Defined in:
lib/kafka/consumer.rb

Overview

A client that consumes messages from a Kafka cluster in coordination with other clients.

A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.

Example

A simple producer that simply writes the messages it consumes to the console.

require "kafka"
kafka = Kafka .new (["kafka1:9092", "kafka2:9092"])
# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")
# Subscribe to a Kafka topic:
consumer.subscribe("messages")
# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer.each_message do |message|
 puts message.topic
 puts message.partition
 puts message.key
 puts message.headers
 puts message.value
 puts message.offset
end

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: []) ⇒ Consumer

Returns a new instance of Consumer.

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/kafka/consumer.rb', line 48
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:,
 session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: [])
 @cluster = cluster
 @logger = TaggedLogger .new (logger)
 @instrumenter = instrumenter
 @group = group
 @offset_manager = offset_manager
 @session_timeout = session_timeout
 @fetcher = fetcher
 @heartbeat = heartbeat
 @refresh_topic_interval = refresh_topic_interval
 @interceptors = Interceptors .new (interceptors: interceptors, logger: logger)
 @pauses = Hash.new {|h, k|
 h[k] = Hash.new {|h2, k2|
 h2[k2] = Pause .new 
 }
 }
 # Whether or not the consumer is currently consuming messages.
 @running = false
 # Hash containing offsets for each topic and partition that has the
 # automatically_mark_as_processed feature disabled. Offset manager is only active
 # when everything is suppose to happen automatically. Otherwise we need to keep track of the
 # offset manually in memory for all the time
 # The key structure for this equals an array with topic and partition [topic, partition]
 # The value is equal to the offset of the last message we've received
 # @note It won't be updated in case user marks message as processed, because for the case
 # when user commits message other than last in a batch, this would make ruby-kafka refetch
 # some already consumed messages
 @current_offsets = Hash.new { |h, k| h[k] = {} }
 # Map storing subscribed topics with their configuration
 @subscribed_topics = Hash.new
 # Set storing topics that matched topics in @subscribed_topics
 @matched_topics = Set.new
 # Whether join_group must be executed again because new topics are added
 @join_group_for_new_topics = false
end

Instance Method Details

#commit_offsetsObject

387
388
389
# File 'lib/kafka/consumer.rb', line 387
def commit_offsets
 @offset_manager.commit_offsets
end

#each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) {|batch| ... } ⇒ nil

Fetches and enumerates the messages in the topics that the consumer group subscribes to.

Each batch of messages is yielded to the provided block. If the block returns without raising an exception, the batch will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message batch in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.

Parameters:

  • min_bytes (Integer) (defaults to: 1)

    the minimum number of bytes to read before returning messages from each broker; if max_wait_time is reached, this is ignored.

  • max_bytes (Integer) (defaults to: 10485760)

    the maximum number of bytes to read before returning messages from each broker.

  • max_wait_time (Integer, Float) (defaults to: 1)

    the maximum duration of time to wait before returning messages from each broker, in seconds.

  • automatically_mark_as_processed (Boolean) (defaults to: true)

    whether to automatically mark a batch’s messages as successfully processed when the block returns without an exception. Once marked successful, the offsets of processed messages can be committed to Kafka.

Yield Parameters:

Returns:

  • (nil)

Raises:

304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/kafka/consumer.rb', line 304
def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
 @fetcher.configure(
 min_bytes: min_bytes,
 max_bytes: max_bytes,
 max_wait_time: max_wait_time,
 )
 consumer_loop do
 batches = fetch_batches
 batches.each do |batch|
 unless batch.empty?
 raw_messages = batch.messages
 batch.messages = raw_messages.reject(&:is_control_record)
 batch = @interceptors.call(batch)
 notification = {
 topic: batch.topic,
 partition: batch.partition,
 last_offset: batch.last_offset,
 last_create_time: batch.messages.last && batch.messages.last.create_time,
 offset_lag: batch.offset_lag,
 highwater_mark_offset: batch.highwater_mark_offset,
 message_count: batch.messages.count,
 }
 # Instrument an event immediately so that subscribers don't have to wait until
 # the block is completed.
 @instrumenter.instrument("start_process_batch.consumer", notification)
 @instrumenter.instrument("process_batch.consumer", notification) do
 begin
 yield batch
 @current_offsets[batch.topic][batch.partition] = batch.last_offset unless batch.unknown_last_offset?
 rescue => e
 offset_range = (batch.first_offset..batch.last_offset || batch.highwater_mark_offset)
 location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}"
 backtrace = e.backtrace.join("\n")
 @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}"
 raise ProcessingError .new (batch.topic, batch.partition, offset_range)
 ensure
 batch.messages = raw_messages
 end
 end
 mark_message_as_processed(batch.messages.last) if automatically_mark_as_processed
 # We've successfully processed a batch from the partition, so we can clear
 # the pause.
 pause_for(batch.topic, batch.partition).reset!
 end
 @offset_manager.commit_offsets_if_necessary
 trigger_heartbeat
 return if shutting_down?
 end
 # We may not have received any messages, but it's still a good idea to
 # commit offsets if we've processed messages in the last set of batches.
 # This also ensures the offsets are retained if we haven't read any messages
 # since the offset retention period has elapsed.
 @offset_manager.commit_offsets_if_necessary
 end
end

#each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) {|message| ... } ⇒ nil

Fetches and enumerates the messages in the topics that the consumer group subscribes to.

Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.

Parameters:

  • min_bytes (Integer) (defaults to: 1)

    the minimum number of bytes to read before returning messages from each broker; if max_wait_time is reached, this is ignored.

  • max_bytes (Integer) (defaults to: 10485760)

    the maximum number of bytes to read before returning messages from each broker.

  • max_wait_time (Integer, Float) (defaults to: 1)

    the maximum duration of time to wait before returning messages from each broker, in seconds.

  • automatically_mark_as_processed (Boolean) (defaults to: true)

    whether to automatically mark a message as successfully processed when the block returns without an exception. Once marked successful, the offsets of processed messages can be committed to Kafka.

Yield Parameters:

Returns:

  • (nil)

Raises:

215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/kafka/consumer.rb', line 215
def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
 @fetcher.configure(
 min_bytes: min_bytes,
 max_bytes: max_bytes,
 max_wait_time: max_wait_time,
 )
 consumer_loop do
 batches = fetch_batches
 batches.each do |batch|
 batch = @interceptors.call(batch)
 batch.messages.each do |message|
 notification = {
 topic: message.topic,
 partition: message.partition,
 offset: message.offset,
 offset_lag: batch.highwater_mark_offset - message.offset - 1,
 create_time: message.create_time,
 key: message.key,
 value: message.value,
 headers: message.headers
 }
 # Instrument an event immediately so that subscribers don't have to wait until
 # the block is completed.
 @instrumenter.instrument("start_process_message.consumer", notification)
 @instrumenter.instrument("process_message.consumer", notification) do
 begin
 yield message unless message.is_control_record
 @current_offsets[message.topic][message.partition] = message.offset
 rescue => e
 location = "#{message.topic}/#{message.partition} at offset #{message.offset}"
 backtrace = e.backtrace.join("\n")
 @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}"
 raise ProcessingError .new (message.topic, message.partition, message.offset)
 end
 end
 mark_message_as_processed(message) if automatically_mark_as_processed
 @offset_manager.commit_offsets_if_necessary
 trigger_heartbeat
 return if shutting_down?
 end
 # We've successfully processed a batch from the partition, so we can clear
 # the pause.
 pause_for(batch.topic, batch.partition).reset!
 end
 # We may not have received any messages, but it's still a good idea to
 # commit offsets if we've processed messages in the last set of batches.
 # This also ensures the offsets are retained if we haven't read any messages
 # since the offset retention period has elapsed.
 @offset_manager.commit_offsets_if_necessary
 end
end

#mark_message_as_processed(message) ⇒ Object

391
392
393
# File 'lib/kafka/consumer.rb', line 391
def mark_message_as_processed(message)
 @offset_manager.mark_as_processed(message.topic, message.partition, message.offset)
end

#pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ nil

Pause processing of a specific topic partition.

When a specific message causes the processor code to fail, it can be a good idea to simply pause the partition until the error can be resolved, allowing the rest of the partitions to continue being processed.

If the timeout argument is passed, the partition will automatically be resumed when the timeout expires. If exponential_backoff is enabled, each subsequent pause will cause the timeout to double until a message from the partition has been successfully processed.

Parameters:

  • topic (String)
  • partition (Integer)
  • timeout (nil, Integer) (defaults to: nil)

    the number of seconds to pause the partition for, or nil if the partition should not be automatically resumed.

  • max_timeout (nil, Integer) (defaults to: nil)

    the maximum number of seconds to pause for, or nil if no maximum should be enforced.

  • exponential_backoff (Boolean) (defaults to: false)

    whether to enable exponential backoff.

Returns:

  • (nil)
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/kafka/consumer.rb', line 152
def pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false)
 if max_timeout && !exponential_backoff
 raise ArgumentError, "`max_timeout` only makes sense when `exponential_backoff` is enabled"
 end
 pause_for(topic, partition).pause!(
 timeout: timeout,
 max_timeout: max_timeout,
 exponential_backoff: exponential_backoff,
 )
end

#paused?(topic, partition) ⇒ Boolean

Whether the topic partition is currently paused.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Boolean)

    true if the partition is paused, false otherwise.

See Also:

183
184
185
186
# File 'lib/kafka/consumer.rb', line 183
def paused?(topic, partition)
 pause = pause_for(topic, partition)
 pause.paused? && !pause.expired?
end

#resume(topic, partition) ⇒ nil

Resume processing of a topic partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (nil)

See Also:

170
171
172
173
174
175
# File 'lib/kafka/consumer.rb', line 170
def resume(topic, partition)
 pause_for(topic, partition).resume!
 # During re-balancing we might have lost the paused partition. Check if partition is still in group before seek.
 seek_to_next(topic, partition) if @group.assigned_to?(topic, partition)
end

#seek(topic, partition, offset) ⇒ nil

Move the consumer’s position in a topic partition to the specified offset.

Note that this has to be done prior to calling #each_message or #each_batch and only has an effect if the consumer is assigned the partition. Typically, you will want to do this in every consumer group member in order to make sure that the member that’s assigned the partition knows where to start.

Parameters:

  • topic (String)
  • partition (Integer)
  • offset (Integer)

Returns:

  • (nil)
383
384
385
# File 'lib/kafka/consumer.rb', line 383
def seek(topic, partition, offset)
 @offset_manager.seek_to(topic, partition, offset)
end

#stopnil

Stop the consumer.

The consumer will finish any in-progress work and shut down.

Returns:

  • (nil)
128
129
130
131
# File 'lib/kafka/consumer.rb', line 128
def stop
 @running = false
 @fetcher.stop
end

#subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil

Subscribes the consumer to a topic.

Typically you either want to start reading messages from the very beginning of the topic’s partitions or you simply want to wait for new messages to be written. In the former case, set start_from_beginning to true (the default); in the latter, set it to false.

Parameters:

  • topic_or_regex (String, Regexp)

    subscribe to single topic with a string or multiple topics matching a regex.

  • default_offset (Symbol) (defaults to: nil)

    whether to start from the beginning or the end of the topic’s partitions. Deprecated.

  • start_from_beginning (Boolean) (defaults to: true)

    whether to start from the beginning of the topic or just subscribe to new messages being produced. This only applies when first consuming a topic partition – once the consumer has checkpointed its progress, it will always resume from the last checkpoint.

  • max_bytes_per_partition (Integer) (defaults to: 1048576)

    the maximum amount of data fetched from a single partition at a time.

Returns:

  • (nil)
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/kafka/consumer.rb', line 110
def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576)
 default_offset ||= start_from_beginning ? :earliest : :latest
 @subscribed_topics[topic_or_regex] = {
 default_offset: default_offset,
 start_from_beginning: start_from_beginning,
 max_bytes_per_partition: max_bytes_per_partition
 }
 scan_for_subscribing
 nil
end

#trigger_heartbeatObject Also known as: send_heartbeat_if_necessary

395
396
397
# File 'lib/kafka/consumer.rb', line 395
def trigger_heartbeat
 @heartbeat.trigger
end

#trigger_heartbeat!Object Also known as: send_heartbeat

399
400
401
# File 'lib/kafka/consumer.rb', line 399
def trigger_heartbeat!
 @heartbeat.trigger!
end

AltStyle によって変換されたページ (->オリジナル) /