Module: Kafka::Protocol
- Defined in:
- lib/kafka/protocol.rb,
lib/kafka/protocol/record.rb,
lib/kafka/protocol/decoder.rb,
lib/kafka/protocol/encoder.rb,
lib/kafka/protocol/message.rb,
lib/kafka/protocol/message_set.rb,
lib/kafka/protocol/record_batch.rb,
lib/kafka/protocol/fetch_request.rb,
lib/kafka/protocol/fetch_response.rb,
lib/kafka/protocol/end_txn_request.rb,
lib/kafka/protocol/produce_request.rb,
lib/kafka/protocol/request_message.rb,
lib/kafka/protocol/end_txn_response.rb,
lib/kafka/protocol/metadata_request.rb,
lib/kafka/protocol/produce_response.rb,
lib/kafka/protocol/heartbeat_request.rb,
lib/kafka/protocol/member_assignment.rb,
lib/kafka/protocol/metadata_response.rb,
lib/kafka/protocol/heartbeat_response.rb,
lib/kafka/protocol/join_group_request.rb,
lib/kafka/protocol/sync_group_request.rb,
lib/kafka/protocol/join_group_response.rb,
lib/kafka/protocol/leave_group_request.rb,
lib/kafka/protocol/list_groups_request.rb,
lib/kafka/protocol/list_offset_request.rb,
lib/kafka/protocol/sync_group_response.rb,
lib/kafka/protocol/api_versions_request.rb,
lib/kafka/protocol/leave_group_response.rb,
lib/kafka/protocol/list_groups_response.rb,
lib/kafka/protocol/list_offset_response.rb,
lib/kafka/protocol/offset_fetch_request.rb,
lib/kafka/protocol/alter_configs_request.rb,
lib/kafka/protocol/api_versions_response.rb,
lib/kafka/protocol/create_topics_request.rb,
lib/kafka/protocol/delete_topics_request.rb,
lib/kafka/protocol/offset_commit_request.rb,
lib/kafka/protocol/offset_fetch_response.rb,
lib/kafka/protocol/alter_configs_response.rb,
lib/kafka/protocol/create_topics_response.rb,
lib/kafka/protocol/delete_topics_response.rb,
lib/kafka/protocol/offset_commit_response.rb,
lib/kafka/protocol/sasl_handshake_request.rb,
lib/kafka/protocol/consumer_group_protocol.rb,
lib/kafka/protocol/describe_groups_request.rb,
lib/kafka/protocol/sasl_handshake_response.rb,
lib/kafka/protocol/describe_configs_request.rb,
lib/kafka/protocol/describe_groups_response.rb,
lib/kafka/protocol/find_coordinator_request.rb,
lib/kafka/protocol/init_producer_id_request.rb,
lib/kafka/protocol/create_partitions_request.rb,
lib/kafka/protocol/describe_configs_response.rb,
lib/kafka/protocol/find_coordinator_response.rb,
lib/kafka/protocol/init_producer_id_response.rb,
lib/kafka/protocol/txn_offset_commit_request.rb,
lib/kafka/protocol/add_offsets_to_txn_request.rb,
lib/kafka/protocol/create_partitions_response.rb,
lib/kafka/protocol/txn_offset_commit_response.rb,
lib/kafka/protocol/add_offsets_to_txn_response.rb,
lib/kafka/protocol/add_partitions_to_txn_request.rb,
lib/kafka/protocol/add_partitions_to_txn_response.rb
Overview
The protocol layer of the library.
The Kafka protocol (https://kafka.apache.org/protocol) defines a set of API requests, each with a well-known numeric API key, as well as a set of error codes with specific meanings.
This module, and the classes contained in it, implement the client side of the protocol.
Defined Under Namespace
Classes: AddOffsetsToTxnRequest , AddOffsetsToTxnResponse , AddPartitionsToTxnRequest , AddPartitionsToTxnResponse , AlterConfigsRequest , AlterConfigsResponse , ApiVersionsRequest , ApiVersionsResponse , ConsumerGroupProtocol , CreatePartitionsRequest , CreatePartitionsResponse , CreateTopicsRequest , CreateTopicsResponse , Decoder , DeleteTopicsRequest , DeleteTopicsResponse , DescribeConfigsRequest , DescribeConfigsResponse , DescribeGroupsRequest , DescribeGroupsResponse , Encoder , EndTxnRequest , EndTxnResposne , FetchRequest , FetchResponse , FindCoordinatorRequest , FindCoordinatorResponse , HeartbeatRequest , HeartbeatResponse , InitProducerIDRequest , InitProducerIDResponse , JoinGroupRequest , JoinGroupResponse , LeaveGroupRequest , LeaveGroupResponse , ListGroupsRequest , ListGroupsResponse , ListOffsetRequest , ListOffsetResponse , MemberAssignment , Message , MessageSet , MetadataRequest , MetadataResponse , OffsetCommitRequest , OffsetCommitResponse , OffsetFetchRequest , OffsetFetchResponse , ProduceRequest , ProduceResponse , Record , RecordBatch , RequestMessage , SaslHandshakeRequest , SaslHandshakeResponse , SyncGroupRequest , SyncGroupResponse , TxnOffsetCommitRequest , TxnOffsetCommitResponse
Constant Summary collapse
- REPLICA_ID =
The replica id of non-brokers is always -1.
-1
- PRODUCE_API =
0
- FETCH_API =
1
- LIST_OFFSET_API =
2
- TOPIC_METADATA_API =
3
- OFFSET_COMMIT_API =
8
- OFFSET_FETCH_API =
9
- FIND_COORDINATOR_API =
10
- JOIN_GROUP_API =
11
- HEARTBEAT_API =
12
- LEAVE_GROUP_API =
13
- SYNC_GROUP_API =
14
- DESCRIBE_GROUPS_API =
15
- LIST_GROUPS_API =
16
- SASL_HANDSHAKE_API =
17
- API_VERSIONS_API =
18
- CREATE_TOPICS_API =
19
- DELETE_TOPICS_API =
20
- INIT_PRODUCER_ID_API =
22
- ADD_PARTITIONS_TO_TXN_API =
24
- ADD_OFFSETS_TO_TXN_API =
25
- END_TXN_API =
26
- TXN_OFFSET_COMMIT_API =
28
- DESCRIBE_CONFIGS_API =
32
- ALTER_CONFIGS_API =
33
- CREATE_PARTITIONS_API =
37
- APIS =
A mapping from numeric API keys to symbolic API names.
{ PRODUCE_API => :produce, FETCH_API => :fetch, LIST_OFFSET_API => :list_offset, TOPIC_METADATA_API => :topic_metadata, OFFSET_COMMIT_API => :offset_commit, OFFSET_FETCH_API => :offset_fetch, FIND_COORDINATOR_API => :find_coordinator, JOIN_GROUP_API => :join_group, HEARTBEAT_API => :heartbeat, LEAVE_GROUP_API => :leave_group, SYNC_GROUP_API => :sync_group, SASL_HANDSHAKE_API => :sasl_handshake, API_VERSIONS_API => :api_versions, CREATE_TOPICS_API => :create_topics, DELETE_TOPICS_API => :delete_topics, INIT_PRODUCER_ID_API => :init_producer_id_api, ADD_PARTITIONS_TO_TXN_API => :add_partitions_to_txn_api, ADD_OFFSETS_TO_TXN_API => :add_offsets_to_txn_api, END_TXN_API => :end_txn_api, TXN_OFFSET_COMMIT_API => :txn_offset_commit_api, DESCRIBE_CONFIGS_API => :describe_configs_api, CREATE_PARTITIONS_API => :create_partitions }
- ERRORS =
A mapping from numeric error codes to exception classes.
{ -1 => UnknownError , 1 => OffsetOutOfRange , 2 => CorruptMessage , 3 => UnknownTopicOrPartition , 4 => InvalidMessageSize , 5 => LeaderNotAvailable , 6 => NotLeaderForPartition , 7 => RequestTimedOut , 8 => BrokerNotAvailable , 9 => ReplicaNotAvailable , 10 => MessageSizeTooLarge , 11 => StaleControllerEpoch , 12 => OffsetMetadataTooLarge , 13 => NetworkException , 14 => CoordinatorLoadInProgress , 15 => CoordinatorNotAvailable , 16 => NotCoordinatorForGroup , 17 => InvalidTopic , 18 => RecordListTooLarge , 19 => NotEnoughReplicas , 20 => NotEnoughReplicasAfterAppend , 21 => InvalidRequiredAcks , 22 => IllegalGeneration , 23 => InconsistentGroupProtocol , 24 => InvalidGroupId , 25 => UnknownMemberId , 26 => InvalidSessionTimeout , 27 => RebalanceInProgress , 28 => InvalidCommitOffsetSize , 29 => TopicAuthorizationFailed , 30 => GroupAuthorizationFailed , 31 => ClusterAuthorizationFailed , 32 => InvalidTimestamp , 33 => UnsupportedSaslMechanism , 34 => InvalidSaslState , 35 => UnsupportedVersion , 36 => TopicAlreadyExists , 37 => InvalidPartitions , 38 => InvalidReplicationFactor , 39 => InvalidReplicaAssignment , 40 => InvalidConfig , 41 => NotController , 42 => InvalidRequest , 43 => UnsupportedForMessageFormat , 44 => PolicyViolation , 45 => OutOfOrderSequenceNumberError , 46 => DuplicateSequenceNumberError , 47 => InvalidProducerEpochError , 48 => InvalidTxnStateError , 49 => InvalidProducerIDMappingError , 50 => InvalidTransactionTimeoutError , 51 => ConcurrentTransactionError , 52 => TransactionCoordinatorFencedError }
- RESOURCE_TYPE_UNKNOWN =
A mapping from int to corresponding resource type in symbol. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
0
- RESOURCE_TYPE_ANY =
1
- RESOURCE_TYPE_TOPIC =
2
- RESOURCE_TYPE_GROUP =
3
- RESOURCE_TYPE_CLUSTER =
4
- RESOURCE_TYPE_TRANSACTIONAL_ID =
5
- RESOURCE_TYPE_DELEGATION_TOKEN =
6
- RESOURCE_TYPES =
{ RESOURCE_TYPE_UNKNOWN => :unknown, RESOURCE_TYPE_ANY => :any, RESOURCE_TYPE_TOPIC => :topic, RESOURCE_TYPE_GROUP => :group, RESOURCE_TYPE_CLUSTER => :cluster, RESOURCE_TYPE_TRANSACTIONAL_ID => :transactional_id, RESOURCE_TYPE_DELEGATION_TOKEN => :delegation_token, }
- COORDINATOR_TYPE_GROUP =
Coordinator types. Since Kafka 0.11.0, there are types of coordinators: Group and Transaction
0
- COORDINATOR_TYPE_TRANSACTION =
1
Class Method Summary collapse
-
.api_name(api_key) ⇒ Symbol
Returns the symbolic name for an API key.
-
.handle_error(error_code, error_message = nil) ⇒ nil
Handles an error code by either doing nothing (if there was no error) or by raising an appropriate exception.
Class Method Details
.api_name(api_key) ⇒ Symbol
Returns the symbolic name for an API key.
Parameters:
-
api_key
—
Integer
Returns:
- (Symbol)
170 171 172
# File 'lib/kafka/protocol.rb', line 170 def self.api_name(api_key) APIS .fetch(api_key, :unknown) end
.handle_error(error_code, error_message = nil) ⇒ nil
Handles an error code by either doing nothing (if there was no error) or by raising an appropriate exception.
156 157 158 159 160 161 162 163 164
# File 'lib/kafka/protocol.rb', line 156 def self.handle_error(error_code, error_message = nil) if error_code == 0 # No errors, yay! elsif error = ERRORS [error_code] raise error, error_message else raise UnknownError , "Unknown error with code #{error_code}#{error_message}" end end