Class: Kafka::Protocol::ProduceRequest
- Inherits:
-
Object
- Object
- Kafka::Protocol::ProduceRequest
- Defined in:
- lib/kafka/protocol/produce_request.rb
Overview
A produce request sends a message set to the server.
API Specification
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
Constant Summary collapse
- API_MIN_VERSION =
3
Instance Attribute Summary collapse
-
#compressor ⇒ Object
readonly
Returns the value of attribute compressor.
-
#messages_for_topics ⇒ Object
readonly
Returns the value of attribute messages_for_topics.
-
#required_acks ⇒ Object
readonly
Returns the value of attribute required_acks.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#transactional_id ⇒ Object
readonly
Returns the value of attribute transactional_id.
Instance Method Summary collapse
- #api_key ⇒ Object
- #api_version ⇒ Object
- #encode(encoder) ⇒ Object
-
#initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) ⇒ ProduceRequest
constructor
A new instance of ProduceRequest.
-
#requires_acks? ⇒ Boolean
Whether this request requires any acknowledgements at all.
- #response_class ⇒ Object
Constructor Details
#initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) ⇒ ProduceRequest
Returns a new instance of ProduceRequest.
Parameters:
- required_acks (Integer)
- timeout (Integer)
- messages_for_topics (Hash)
37 38 39 40 41 42 43
# File 'lib/kafka/protocol/produce_request.rb', line 37 def initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) @transactional_id = transactional_id @required_acks = required_acks @timeout = timeout @messages_for_topics = messages_for_topics @compressor = compressor end
Instance Attribute Details
#compressor ⇒ Object (readonly)
Returns the value of attribute compressor.
32 33 34
# File 'lib/kafka/protocol/produce_request.rb', line 32 def compressor @compressor end
#messages_for_topics ⇒ Object (readonly)
Returns the value of attribute messages_for_topics.
32 33 34
# File 'lib/kafka/protocol/produce_request.rb', line 32 def messages_for_topics @messages_for_topics end
#required_acks ⇒ Object (readonly)
Returns the value of attribute required_acks.
32 33 34
# File 'lib/kafka/protocol/produce_request.rb', line 32 def required_acks @required_acks end
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
32 33 34
# File 'lib/kafka/protocol/produce_request.rb', line 32 def timeout @timeout end
#transactional_id ⇒ Object (readonly)
Returns the value of attribute transactional_id.
32 33 34
# File 'lib/kafka/protocol/produce_request.rb', line 32 def transactional_id @transactional_id end
Instance Method Details
#api_key ⇒ Object
45 46 47
# File 'lib/kafka/protocol/produce_request.rb', line 45 def api_key PRODUCE_API end
#api_version ⇒ Object
49 50 51
# File 'lib/kafka/protocol/produce_request.rb', line 49 def api_version compressor.codec.nil? ? API_MIN_VERSION : [compressor.codec.produce_api_min_version, API_MIN_VERSION ].max end
#encode(encoder) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
# File 'lib/kafka/protocol/produce_request.rb', line 65 def encode(encoder) encoder.write_string(@transactional_id) encoder.write_int16(@required_acks) encoder.write_int32(@timeout) encoder.write_array(@messages_for_topics) do |topic, messages_for_partition| encoder.write_string(topic) encoder.write_array(messages_for_partition) do |partition, record_batch| encoder.write_int32(partition) record_batch.fulfill_relative_data encoded_record_batch = compress(record_batch) encoder.write_bytes(encoded_record_batch) end end end
#requires_acks? ⇒ Boolean
Whether this request requires any acknowledgements at all. If no acknowledgements are required, the server will not send back a response at all.
Returns:
-
(Boolean)
—
true if acknowledgements are required, false otherwise.
61 62 63
# File 'lib/kafka/protocol/produce_request.rb', line 61 def requires_acks? @required_acks != 0 end
#response_class ⇒ Object
53 54 55
# File 'lib/kafka/protocol/produce_request.rb', line 53 def response_class requires_acks? ? Protocol ::ProduceResponse : nil end