[フレーム]

Class: Kafka::Protocol::FetchRequest

Inherits:
Object
  • Object
  • Kafka::Protocol::FetchRequest
show all
Defined in:
lib/kafka/protocol/fetch_request.rb

Overview

A request to fetch messages from a given partition.

API Specification

FetchRequest => ReplicaId MaxWaitTime MinBytes MaxBytes IsolationLevel [TopicName [Partition FetchOffset MaxBytes]]
 ReplicaId => int32
 MaxWaitTime => int32
 MinBytes => int32
 MaxBytes => int32
 IsolationLevel => int8
 TopicName => string
 Partition => int32
 FetchOffset => int64
 MaxBytes => int32

Constant Summary collapse

ISOLATION_READ_UNCOMMITTED =
0
ISOLATION_READ_COMMITTED =
1

Instance Method Summary collapse

Constructor Details

#initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) ⇒ FetchRequest

Returns a new instance of FetchRequest.

Parameters:

  • max_wait_time (Integer)
  • min_bytes (Integer)
  • topics (Hash)
28
29
30
31
32
33
34
# File 'lib/kafka/protocol/fetch_request.rb', line 28
def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:)
 @replica_id = REPLICA_ID 
 @max_wait_time = max_wait_time
 @min_bytes = min_bytes
 @max_bytes = max_bytes
 @topics = topics
end

Instance Method Details

#api_keyObject

36
37
38
# File 'lib/kafka/protocol/fetch_request.rb', line 36
def api_key
 FETCH_API 
end

#api_versionObject

40
41
42
# File 'lib/kafka/protocol/fetch_request.rb', line 40
def api_version
 4
end

#encode(encoder) ⇒ Object

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kafka/protocol/fetch_request.rb', line 48
def encode(encoder)
 encoder.write_int32(@replica_id)
 encoder.write_int32(@max_wait_time)
 encoder.write_int32(@min_bytes)
 encoder.write_int32(@max_bytes)
 encoder.write_int8(ISOLATION_READ_COMMITTED )
 encoder.write_array(@topics) do |topic, partitions|
 encoder.write_string(topic)
 encoder.write_array(partitions) do |partition, config|
 fetch_offset = config.fetch(:fetch_offset)
 max_bytes = config.fetch(:max_bytes)
 encoder.write_int32(partition)
 encoder.write_int64(fetch_offset)
 encoder.write_int32(max_bytes)
 end
 end
end

#response_classObject

44
45
46
# File 'lib/kafka/protocol/fetch_request.rb', line 44
def response_class
 Protocol ::FetchResponse 
end

AltStyle によって変換されたページ (->オリジナル) /