[フレーム]

Class: Kafka::Protocol::JoinGroupRequest

Inherits:
Object
  • Object
  • Kafka::Protocol::JoinGroupRequest
show all
Defined in:
lib/kafka/protocol/join_group_request.rb

Constant Summary collapse

PROTOCOL_TYPE =
"consumer"

Instance Method Summary collapse

Constructor Details

#initialize(group_id:, session_timeout:, rebalance_timeout:, member_id:, topics: [], protocol_name:, user_data: nil) ⇒ JoinGroupRequest

Returns a new instance of JoinGroupRequest.

10
11
12
13
14
15
16
17
18
19
# File 'lib/kafka/protocol/join_group_request.rb', line 10
def initialize(group_id:, session_timeout:, rebalance_timeout:, member_id:, topics: [], protocol_name:, user_data: nil)
 @group_id = group_id
 @session_timeout = session_timeout * 1000 # Kafka wants ms.
 @rebalance_timeout = rebalance_timeout * 1000 # Kafka wants ms.
 @member_id = member_id || ""
 @protocol_type = PROTOCOL_TYPE 
 @group_protocols = {
 protocol_name => ConsumerGroupProtocol .new (topics: topics, user_data: user_data),
 }
end

Instance Method Details

#api_keyObject

21
22
23
# File 'lib/kafka/protocol/join_group_request.rb', line 21
def api_key
 JOIN_GROUP_API 
end

#api_versionObject

25
26
27
# File 'lib/kafka/protocol/join_group_request.rb', line 25
def api_version
 1
end

#encode(encoder) ⇒ Object

33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/kafka/protocol/join_group_request.rb', line 33
def encode(encoder)
 encoder.write_string(@group_id)
 encoder.write_int32(@session_timeout)
 encoder.write_int32(@rebalance_timeout)
 encoder.write_string(@member_id)
 encoder.write_string(@protocol_type)
 encoder.write_array(@group_protocols) do |name, metadata|
 encoder.write_string(name)
 encoder.write_bytes(Encoder .encode_with (metadata))
 end
end

#response_classObject

29
30
31
# File 'lib/kafka/protocol/join_group_request.rb', line 29
def response_class
 JoinGroupResponse 
end

AltStyle によって変換されたページ (->オリジナル) /