Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/kafka/consumer.rb
Overview
A client that consumes messages from a Kafka cluster in coordination with other clients.
A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.
Example
A simple producer that simply writes the messages it consumes to the console.
require "kafka"
kafka = Kafka .new (["kafka1:9092", "kafka2:9092"])
# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")
# Subscribe to a Kafka topic:
consumer.subscribe("messages")
# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer.each_message do |message|
puts message.topic
puts message.partition
puts message.key
puts message.headers
puts message.value
puts message.offset
end
Instance Method Summary collapse
- #commit_offsets ⇒ Object
-
#each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) {|batch| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: []) ⇒ Consumer
constructor
A new instance of Consumer.
- #mark_message_as_processed(message) ⇒ Object
-
#pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ nil
Pause processing of a specific topic partition.
-
#paused?(topic, partition) ⇒ Boolean
Whether the topic partition is currently paused.
-
#resume(topic, partition) ⇒ nil
Resume processing of a topic partition.
-
#seek(topic, partition, offset) ⇒ nil
Move the consumer’s position in a topic partition to the specified offset.
-
#stop ⇒ nil
Stop the consumer.
-
#subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil
Subscribes the consumer to a topic.
- #trigger_heartbeat ⇒ Object (also: #send_heartbeat_if_necessary)
- #trigger_heartbeat! ⇒ Object (also: #send_heartbeat)
Constructor Details
#initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: []) ⇒ Consumer
Returns a new instance of Consumer.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
# File 'lib/kafka/consumer.rb', line 48 def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: []) @cluster = cluster @logger = TaggedLogger .new (logger) @instrumenter = instrumenter @group = group @offset_manager = offset_manager @session_timeout = session_timeout @fetcher = fetcher @heartbeat = heartbeat @refresh_topic_interval = refresh_topic_interval @interceptors = Interceptors .new (interceptors: interceptors, logger: logger) @pauses = Hash.new {|h, k| h[k] = Hash.new {|h2, k2| h2[k2] = Pause .new } } # Whether or not the consumer is currently consuming messages. @running = false # Hash containing offsets for each topic and partition that has the # automatically_mark_as_processed feature disabled. Offset manager is only active # when everything is suppose to happen automatically. Otherwise we need to keep track of the # offset manually in memory for all the time # The key structure for this equals an array with topic and partition [topic, partition] # The value is equal to the offset of the last message we've received # @note It won't be updated in case user marks message as processed, because for the case # when user commits message other than last in a batch, this would make ruby-kafka refetch # some already consumed messages @current_offsets = Hash.new { |h, k| h[k] = {} } # Map storing subscribed topics with their configuration @subscribed_topics = Hash.new # Set storing topics that matched topics in @subscribed_topics @matched_topics = Set.new # Whether join_group must be executed again because new topics are added @join_group_for_new_topics = false end
Instance Method Details
#commit_offsets ⇒ Object
387 388 389
# File 'lib/kafka/consumer.rb', line 387 def commit_offsets @offset_manager.commit_offsets end
#each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) {|batch| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each batch of messages is yielded to the provided block. If the block returns without raising an exception, the batch will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message batch in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
Parameters:
-
min_bytes
(Integer)
(defaults to: 1)
—
the minimum number of bytes to read before returning messages from each broker; if
max_wait_time
is reached, this is ignored. -
max_bytes
(Integer)
(defaults to: 10485760)
—
the maximum number of bytes to read before returning messages from each broker.
-
max_wait_time
(Integer, Float)
(defaults to: 1)
—
the maximum duration of time to wait before returning messages from each broker, in seconds.
-
automatically_mark_as_processed
(Boolean)
(defaults to: true)
—
whether to automatically mark a batch’s messages as successfully processed when the block returns without an exception. Once marked successful, the offsets of processed messages can be committed to Kafka.
Yield Parameters:
-
batch
(Kafka::FetchedBatch )
—
a message batch fetched from Kafka.
Returns:
- (nil)
Raises:
-
(Kafka::ProcessingError )
—
if there was an error processing a batch. The original exception will be returned by calling
#cause
on the ProcessingError instance.
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
# File 'lib/kafka/consumer.rb', line 304 def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) @fetcher.configure( min_bytes: min_bytes, max_bytes: max_bytes, max_wait_time: max_wait_time, ) consumer_loop do batches = fetch_batches batches.each do |batch| unless batch.empty? raw_messages = batch.messages batch.messages = raw_messages.reject(&:is_control_record) batch = @interceptors.call(batch) notification = { topic: batch.topic, partition: batch.partition, last_offset: batch.last_offset, last_create_time: batch.messages.last && batch.messages.last.create_time, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch.messages.count, } # Instrument an event immediately so that subscribers don't have to wait until # the block is completed. @instrumenter.instrument("start_process_batch.consumer", notification) @instrumenter.instrument("process_batch.consumer", notification) do begin yield batch @current_offsets[batch.topic][batch.partition] = batch.last_offset unless batch.unknown_last_offset? rescue => e offset_range = (batch.first_offset..batch.last_offset || batch.highwater_mark_offset) location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError .new (batch.topic, batch.partition, offset_range) ensure batch.messages = raw_messages end end mark_message_as_processed(batch.messages.last) if automatically_mark_as_processed # We've successfully processed a batch from the partition, so we can clear # the pause. pause_for(batch.topic, batch.partition).reset! end @offset_manager.commit_offsets_if_necessary trigger_heartbeat return if shutting_down? end # We may not have received any messages, but it's still a good idea to # commit offsets if we've processed messages in the last set of batches. # This also ensures the offsets are retained if we haven't read any messages # since the offset retention period has elapsed. @offset_manager.commit_offsets_if_necessary end end
#each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
Parameters:
-
min_bytes
(Integer)
(defaults to: 1)
—
the minimum number of bytes to read before returning messages from each broker; if
max_wait_time
is reached, this is ignored. -
max_bytes
(Integer)
(defaults to: 10485760)
—
the maximum number of bytes to read before returning messages from each broker.
-
max_wait_time
(Integer, Float)
(defaults to: 1)
—
the maximum duration of time to wait before returning messages from each broker, in seconds.
-
automatically_mark_as_processed
(Boolean)
(defaults to: true)
—
whether to automatically mark a message as successfully processed when the block returns without an exception. Once marked successful, the offsets of processed messages can be committed to Kafka.
Yield Parameters:
-
message
(Kafka::FetchedMessage )
—
a message fetched from Kafka.
Returns:
- (nil)
Raises:
-
(Kafka::ProcessingError )
—
if there was an error processing a message. The original exception will be returned by calling
#cause
on the ProcessingError instance.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
# File 'lib/kafka/consumer.rb', line 215 def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) @fetcher.configure( min_bytes: min_bytes, max_bytes: max_bytes, max_wait_time: max_wait_time, ) consumer_loop do batches = fetch_batches batches.each do |batch| batch = @interceptors.call(batch) batch.messages.each do |message| notification = { topic: message.topic, partition: message.partition, offset: message.offset, offset_lag: batch.highwater_mark_offset - message.offset - 1, create_time: message.create_time, key: message.key, value: message.value, headers: message.headers } # Instrument an event immediately so that subscribers don't have to wait until # the block is completed. @instrumenter.instrument("start_process_message.consumer", notification) @instrumenter.instrument("process_message.consumer", notification) do begin yield message unless message.is_control_record @current_offsets[message.topic][message.partition] = message.offset rescue => e location = "#{message.topic}/#{message.partition} at offset #{message.offset}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError .new (message.topic, message.partition, message.offset) end end mark_message_as_processed(message) if automatically_mark_as_processed @offset_manager.commit_offsets_if_necessary trigger_heartbeat return if shutting_down? end # We've successfully processed a batch from the partition, so we can clear # the pause. pause_for(batch.topic, batch.partition).reset! end # We may not have received any messages, but it's still a good idea to # commit offsets if we've processed messages in the last set of batches. # This also ensures the offsets are retained if we haven't read any messages # since the offset retention period has elapsed. @offset_manager.commit_offsets_if_necessary end end
#mark_message_as_processed(message) ⇒ Object
391 392 393
# File 'lib/kafka/consumer.rb', line 391 def mark_message_as_processed(message) @offset_manager.mark_as_processed(message.topic, message.partition, message.offset) end
#pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ nil
Pause processing of a specific topic partition.
When a specific message causes the processor code to fail, it can be a good idea to simply pause the partition until the error can be resolved, allowing the rest of the partitions to continue being processed.
If the timeout
argument is passed, the partition will automatically be
resumed when the timeout expires. If exponential_backoff
is enabled, each
subsequent pause will cause the timeout to double until a message from the
partition has been successfully processed.
Parameters:
- topic (String)
- partition (Integer)
-
timeout
(nil, Integer)
(defaults to: nil)
—
the number of seconds to pause the partition for, or
nil
if the partition should not be automatically resumed. -
max_timeout
(nil, Integer)
(defaults to: nil)
—
the maximum number of seconds to pause for, or
nil
if no maximum should be enforced. -
exponential_backoff
(Boolean)
(defaults to: false)
—
whether to enable exponential backoff.
Returns:
- (nil)
152 153 154 155 156 157 158 159 160 161 162
# File 'lib/kafka/consumer.rb', line 152 def pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false) if max_timeout && !exponential_backoff raise ArgumentError, "`max_timeout` only makes sense when `exponential_backoff` is enabled" end pause_for(topic, partition).pause!( timeout: timeout, max_timeout: max_timeout, exponential_backoff: exponential_backoff, ) end
#paused?(topic, partition) ⇒ Boolean
Whether the topic partition is currently paused.
Parameters:
- topic (String)
- partition (Integer)
Returns:
-
(Boolean)
—
true if the partition is paused, false otherwise.
See Also:
183 184 185 186
# File 'lib/kafka/consumer.rb', line 183 def paused?(topic, partition) pause = pause_for(topic, partition) pause.paused? && !pause.expired? end
#resume(topic, partition) ⇒ nil
Resume processing of a topic partition.
170 171 172 173 174 175
# File 'lib/kafka/consumer.rb', line 170 def resume(topic, partition) pause_for(topic, partition).resume! # During re-balancing we might have lost the paused partition. Check if partition is still in group before seek. seek_to_next(topic, partition) if @group.assigned_to?(topic, partition) end
#seek(topic, partition, offset) ⇒ nil
Move the consumer’s position in a topic partition to the specified offset.
Note that this has to be done prior to calling #each_message or #each_batch and only has an effect if the consumer is assigned the partition. Typically, you will want to do this in every consumer group member in order to make sure that the member that’s assigned the partition knows where to start.
Parameters:
- topic (String)
- partition (Integer)
- offset (Integer)
Returns:
- (nil)
383 384 385
# File 'lib/kafka/consumer.rb', line 383 def seek(topic, partition, offset) @offset_manager.seek_to(topic, partition, offset) end
#stop ⇒ nil
Stop the consumer.
The consumer will finish any in-progress work and shut down.
Returns:
- (nil)
128 129 130 131
# File 'lib/kafka/consumer.rb', line 128 def stop @running = false @fetcher.stop end
#subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil
Subscribes the consumer to a topic.
Typically you either want to start reading messages from the very
beginning of the topic’s partitions or you simply want to wait for new
messages to be written. In the former case, set start_from_beginning
to true (the default); in the latter, set it to false.
Parameters:
-
topic_or_regex
(String, Regexp)
—
subscribe to single topic with a string or multiple topics matching a regex.
-
default_offset
(Symbol)
(defaults to: nil)
—
whether to start from the beginning or the end of the topic’s partitions. Deprecated.
-
start_from_beginning
(Boolean)
(defaults to: true)
—
whether to start from the beginning of the topic or just subscribe to new messages being produced. This only applies when first consuming a topic partition – once the consumer has checkpointed its progress, it will always resume from the last checkpoint.
-
max_bytes_per_partition
(Integer)
(defaults to: 1048576)
—
the maximum amount of data fetched from a single partition at a time.
Returns:
- (nil)
110 111 112 113 114 115 116 117 118 119 120 121
# File 'lib/kafka/consumer.rb', line 110 def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) default_offset ||= start_from_beginning ? :earliest : :latest @subscribed_topics[topic_or_regex] = { default_offset: default_offset, start_from_beginning: start_from_beginning, max_bytes_per_partition: max_bytes_per_partition } scan_for_subscribing nil end
#trigger_heartbeat ⇒ Object Also known as: send_heartbeat_if_necessary
395 396 397
# File 'lib/kafka/consumer.rb', line 395 def trigger_heartbeat @heartbeat.trigger end
#trigger_heartbeat! ⇒ Object Also known as: send_heartbeat
399 400 401
# File 'lib/kafka/consumer.rb', line 399 def trigger_heartbeat! @heartbeat.trigger! end