[フレーム]

Class: Kafka::Protocol::ProduceRequest

Inherits:
Object
  • Object
  • Kafka::Protocol::ProduceRequest
show all
Defined in:
lib/kafka/protocol/produce_request.rb

Overview

A produce request sends a message set to the server.

API Specification

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
 RequiredAcks => int16
 Timeout => int32
 Partition => int32
 MessageSetSize => int32
MessageSet => [Offset MessageSize Message]
 Offset => int64
 MessageSize => int32
Message => Crc MagicByte Attributes Key Value
 Crc => int32
 MagicByte => int8
 Attributes => int8
 Key => bytes
 Value => bytes

Constant Summary collapse

API_MIN_VERSION =
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) ⇒ ProduceRequest

Returns a new instance of ProduceRequest.

Parameters:

  • required_acks (Integer)
  • timeout (Integer)
  • messages_for_topics (Hash)
37
38
39
40
41
42
43
# File 'lib/kafka/protocol/produce_request.rb', line 37
def initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil)
 @transactional_id = transactional_id
 @required_acks = required_acks
 @timeout = timeout
 @messages_for_topics = messages_for_topics
 @compressor = compressor
end

Instance Attribute Details

#compressorObject (readonly)

Returns the value of attribute compressor.

32
33
34
# File 'lib/kafka/protocol/produce_request.rb', line 32
def compressor
 @compressor
end

#messages_for_topicsObject (readonly)

Returns the value of attribute messages_for_topics.

32
33
34
# File 'lib/kafka/protocol/produce_request.rb', line 32
def messages_for_topics
 @messages_for_topics
end

#required_acksObject (readonly)

Returns the value of attribute required_acks.

32
33
34
# File 'lib/kafka/protocol/produce_request.rb', line 32
def required_acks
 @required_acks
end

#timeoutObject (readonly)

Returns the value of attribute timeout.

32
33
34
# File 'lib/kafka/protocol/produce_request.rb', line 32
def timeout
 @timeout
end

#transactional_idObject (readonly)

Returns the value of attribute transactional_id.

32
33
34
# File 'lib/kafka/protocol/produce_request.rb', line 32
def transactional_id
 @transactional_id
end

Instance Method Details

#api_keyObject

45
46
47
# File 'lib/kafka/protocol/produce_request.rb', line 45
def api_key
 PRODUCE_API 
end

#api_versionObject

49
50
51
# File 'lib/kafka/protocol/produce_request.rb', line 49
def api_version
 compressor.codec.nil? ? API_MIN_VERSION  : [compressor.codec.produce_api_min_version, API_MIN_VERSION ].max
end

#encode(encoder) ⇒ Object

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/kafka/protocol/produce_request.rb', line 65
def encode(encoder)
 encoder.write_string(@transactional_id)
 encoder.write_int16(@required_acks)
 encoder.write_int32(@timeout)
 encoder.write_array(@messages_for_topics) do |topic, messages_for_partition|
 encoder.write_string(topic)
 encoder.write_array(messages_for_partition) do |partition, record_batch|
 encoder.write_int32(partition)
 record_batch.fulfill_relative_data
 encoded_record_batch = compress(record_batch)
 encoder.write_bytes(encoded_record_batch)
 end
 end
end

#requires_acks?Boolean

Whether this request requires any acknowledgements at all. If no acknowledgements are required, the server will not send back a response at all.

Returns:

  • (Boolean)

    true if acknowledgements are required, false otherwise.

61
62
63
# File 'lib/kafka/protocol/produce_request.rb', line 61
def requires_acks?
 @required_acks != 0
end

#response_classObject

53
54
55
# File 'lib/kafka/protocol/produce_request.rb', line 53
def response_class
 requires_acks? ? Protocol ::ProduceResponse  : nil
end

AltStyle によって変換されたページ (->オリジナル) /