[フレーム]

Class: Kafka::Protocol::FetchResponse

Inherits:
Object
  • Object
  • Kafka::Protocol::FetchResponse
show all
Defined in:
lib/kafka/protocol/fetch_response.rb

Overview

A response to a fetch request.

API Specification

FetchResponse => ThrottleTimeMS [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset [AbortedTransaction] Records]]
 ThrottleTimeMS => int32
 TopicName => string
 Partition => int32
 ErrorCode => int16
 HighwaterMarkOffset => int64
 LastStableOffset => int64
 MessageSetSize => int32
 AbortedTransaction => [
 ProducerId => int64
 FirstOffset => int64
 ]

Defined Under Namespace

Classes: AbortedTransaction , FetchedPartition , FetchedTopic

Constant Summary collapse

MAGIC_BYTE_OFFSET =
16
MAGIC_BYTE_LENGTH =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topics: [], throttle_time_ms: 0) ⇒ FetchResponse

Returns a new instance of FetchResponse.

64
65
66
67
# File 'lib/kafka/protocol/fetch_response.rb', line 64
def initialize(topics: [], throttle_time_ms: 0)
 @topics = topics
 @throttle_time_ms = throttle_time_ms
end

Instance Attribute Details

#topicsObject (readonly)

Returns the value of attribute topics.

62
63
64
# File 'lib/kafka/protocol/fetch_response.rb', line 62
def topics
 @topics
end

Class Method Details

.decode(decoder) ⇒ Object

69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/kafka/protocol/fetch_response.rb', line 69
def self.decode(decoder)
 throttle_time_ms = decoder.int32
 topics = decoder.array do
 topic_name = decoder.string
 partitions = decoder.array do
 partition = decoder.int32
 error_code = decoder.int16
 highwater_mark_offset = decoder.int64
 last_stable_offset = decoder.int64
 aborted_transactions = decoder.array do
 producer_id = decoder.int64
 first_offset = decoder.int64
 AbortedTransaction .new (
 producer_id: producer_id,
 first_offset: first_offset
 )
 end
 messages_raw = decoder.bytes
 messages = []
 if !messages_raw.nil? && !messages_raw.empty?
 messages_decoder = Decoder .from_string (messages_raw)
 magic_byte = messages_decoder.peek(MAGIC_BYTE_OFFSET , MAGIC_BYTE_LENGTH )[0].to_i
 if magic_byte == RecordBatch ::MAGIC_BYTE 
 until messages_decoder.eof?
 begin
 record_batch = RecordBatch .decode (messages_decoder)
 messages << record_batch
 rescue InsufficientDataMessage 
 if messages.length > 0
 break
 else
 raise
 end
 end
 end
 else
 message_set = MessageSet .decode (messages_decoder)
 messages << message_set
 end
 end
 FetchedPartition .new (
 partition: partition,
 error_code: error_code,
 highwater_mark_offset: highwater_mark_offset,
 last_stable_offset: last_stable_offset,
 aborted_transactions: aborted_transactions,
 messages: messages
 )
 end
 FetchedTopic .new (
 name: topic_name,
 partitions: partitions,
 )
 end
 new(topics: topics, throttle_time_ms: throttle_time_ms)
end

AltStyle によって変換されたページ (->オリジナル) /