[フレーム]

Class: Kafka::Protocol::Message

Inherits:
Object
  • Object
  • Kafka::Protocol::Message
show all
Defined in:
lib/kafka/protocol/message.rb

Overview

API Specification

Message => Crc MagicByte Attributes Timestamp Key Value
 Crc => int32
 MagicByte => int8
 Attributes => int8
 Timestamp => int64, in ms
 Key => bytes
 Value => bytes

Constant Summary collapse

MAGIC_BYTE =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)) ⇒ Message

Returns a new instance of Message.

26
27
28
29
30
31
32
33
34
# File 'lib/kafka/protocol/message.rb', line 26
def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)
 @key = key
 @value = value
 @codec_id = codec_id
 @offset = offset
 @create_time = create_time
 @bytesize = @key.to_s.bytesize + @value.to_s.bytesize
end

Instance Attribute Details

#bytesizeObject (readonly)

Returns the value of attribute bytesize.

24
25
26
# File 'lib/kafka/protocol/message.rb', line 24
def bytesize
 @bytesize
end

#codec_idObject (readonly)

Returns the value of attribute codec_id.

22
23
24
# File 'lib/kafka/protocol/message.rb', line 22
def codec_id
 @codec_id
end

#create_timeObject (readonly)

Returns the value of attribute create_time.

24
25
26
# File 'lib/kafka/protocol/message.rb', line 24
def create_time
 @create_time
end

#keyObject (readonly)

Returns the value of attribute key.

22
23
24
# File 'lib/kafka/protocol/message.rb', line 22
def key
 @key
end

#offsetObject (readonly)

Returns the value of attribute offset.

22
23
24
# File 'lib/kafka/protocol/message.rb', line 22
def offset
 @offset
end

#valueObject (readonly)

Returns the value of attribute value.

22
23
24
# File 'lib/kafka/protocol/message.rb', line 22
def value
 @value
end

Class Method Details

.decode(decoder) ⇒ Object

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/kafka/protocol/message.rb', line 66
def self.decode(decoder)
 offset = decoder.int64
 message_decoder = Decoder .from_string (decoder.bytes)
 _crc = message_decoder.int32
 magic_byte = message_decoder.int8
 attributes = message_decoder.int8
 # The magic byte indicates the message format version. There are situations
 # where an old message format can be returned from a newer version of Kafka,
 # because old messages are not necessarily rewritten on upgrades.
 case magic_byte
 when 0
 # No timestamp in the pre-0.10 message format.
 timestamp = nil
 when 1
 timestamp = message_decoder.int64
 # If the timestamp is set to zero, it's because the message has been upgraded
 # from the Kafka 0.9 disk format to the Kafka 0.10 format. The former didn't
 # have a timestamp attribute, so we'll just set the timestamp to nil.
 timestamp = nil if timestamp.zero?
 else
 raise Kafka ::Error , "Invalid magic byte: #{magic_byte}"
 end
 key = message_decoder.bytes
 value = message_decoder.bytes
 # The codec id is encoded in the three least significant bits of the
 # attributes.
 codec_id = attributes & 0b111
 # The timestamp will be nil if the message was written in the Kafka 0.9 log format.
 create_time = timestamp && Time.at(timestamp / 1000.0)
 new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time)
end

Instance Method Details

#==(other) ⇒ Object

43
44
45
46
47
48
# File 'lib/kafka/protocol/message.rb', line 43
def ==(other)
 @key == other.key &&
 @value == other.value &&
 @codec_id == other.codec_id &&
 @offset == other.offset
end

#compressed?Boolean

Returns:

  • (Boolean)
50
51
52
# File 'lib/kafka/protocol/message.rb', line 50
def compressed?
 @codec_id != 0
end

#decompressArray<Kafka::Protocol::Message >

Returns:

55
56
57
58
59
60
61
62
63
64
# File 'lib/kafka/protocol/message.rb', line 55
def decompress
 codec = Compression .find_codec_by_id (@codec_id)
 # For some weird reason we need to cut out the first 20 bytes.
 data = codec.decompress(value)
 message_set_decoder = Decoder .from_string (data)
 message_set = MessageSet .decode (message_set_decoder)
 correct_offsets(message_set.messages)
end

#encode(encoder) ⇒ Object

36
37
38
39
40
41
# File 'lib/kafka/protocol/message.rb', line 36
def encode(encoder)
 data = encode_with_crc
 encoder.write_int64(offset)
 encoder.write_bytes(data)
end

#headersObject

110
111
112
# File 'lib/kafka/protocol/message.rb', line 110
def headers
 {}
end

#is_control_recordObject

Ensure the backward compatibility of Message format from Kafka 0.11.x

106
107
108
# File 'lib/kafka/protocol/message.rb', line 106
def is_control_record
 false
end

AltStyle によって変換されたページ (->オリジナル) /