Cases describe issues related to "auto.commit.enable": "false"
and scalability.
First
KAFKA REST Proxy 1 locks Second
KAFKA REST Proxy 2.
+---------------------+
| CURL produce |
| topic: jsontest |
+----------+----------+
| [ok] produce 10 records
|
+-------------------+ | +-------------------+
| CURL consumer 1 | | | CURL consumer 2 |
| | | | |
+-------+-----------+ | +------+------------+
[ok] create consumer | | | [ok] create consumer
[ok] subscribe | | | [ok] subscribe
[ok] consume records | | | [hung] consume records
| | |
+-----v-------+ | +----v--------+
| Kafka REST <--------+ | Kafka REST |
| port:18082 | | port:28082 |
+------+------+ +------+------+
| |
| |
| |
+--------v----------------------------v------------+
| Kafka |
| port:9092 |
+----------------+---------------------------------+
|
+----------------v---------------------------------+
| Zookeeper |
| port:2181 |
+--------------------------------------------------+
- 1/ Start services
docker-compose.yml
version: "3.5" services: zookeeper: image: confluentinc/cp-zookeeper:5.0.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:5.0.0 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 kafka-rest-1: image: confluentinc/cp-kafka-rest:5.0.0 depends_on: - kafka ports: - 18082:8082 environment: KAFKA_REST_ID: "1" KAFKA_REST_HOST_NAME: kafka-rest-1 KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:9092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_PRODUCER_THREADS: "10" kafka-rest-2: image: confluentinc/cp-kafka-rest:5.0.0 depends_on: - kafka ports: - 28082:8082 environment: KAFKA_REST_ID: "2" KAFKA_REST_HOST_NAME: kafka-rest-2 KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:9092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_PRODUCER_THREADS: "10"
-
Start services.
docker-compose up
-
2/ Create topic with partitions
-
topic + 10 partitions:
docker-compose exec kafka bash -c "kafka-topics --zookeeper zookeeper:2181 --topic jsontest --create --partitions 10 --replication-factor 1"
-
describe to be sure:
docker-compose exec kafka bash -c "kafka-topics --zookeeper zookeeper:2181 --topic jsontest --describe"
-
3/ produce records
-
10 simple records: produce 10 records: {v: 0} ... {v: 9}
-
KAFKA REST
First
port18082
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --data '{"records":[{"value":{"v":"0"}}, {"value":{"v":"1"}}, {"value":{"v":"2"}}, {"value":{"v":"3"}}, {"value":{"v":"4"}}, {"value":{"v":"5"}}, {"value":{"v":"6"}}, {"value":{"v":"7"}}, {"value":{"v":"8"}}, {"value":{"v":"9"}}]}' \ "http://localhost:18082/topics/jsontest"
- 4/ Create CURL consumer 1
- It creates consumer instance and subscribe topic
jsontest
. Kafka REST 1 port:18082
# create consumer 1 # "auto.commit.enable": "false" curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance_1", "auto.commit.enable": "false", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:18082/consumers/my_json_consumer printf "\n" # subscribe curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["jsontest"]}' \ http://localhost:18082/consumers/my_json_consumer/instances/my_consumer_instance_1/subscription
- 5/ Consumer 1 reads records
- It consumes from Kafka REST 1 port
18082
# consume 1 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:18082/consumers/my_json_consumer/instances/my_consumer_instance_1/records?max_bytes=10
- Results looks like:
[{"topic":"jsontest","key":null,"value":{"v":"3"},"partition":4,"offset":0}]
[{"topic":"jsontest","key":null,"value":{"v":"2"},"partition":5,"offset":0}]
[{"topic":"jsontest","key":null,"value":{"v":"8"},"partition":6,"offset":0}]
- Or it could read multiple records if
max_bytes=20
## Mesages from multiple partitions [{"topic":"jsontest","key":null,"value":{"v":"3"},"partition":4,"offset":0},{"topic":"jsontest","key":null,"value":{"v":"2"},"partition":5,"offset":0}]
-
What do you think is it ok that consumer reads multiple partitions at once? when we use
"auto.commit.enable": "false"
. Seems it could be the issue. -
6/ Create CURL consumer 2
-
It creates consumer instance and subscribe topic
jsontest
. Kafka REST 2 port:28082
# create consumer 2 # "auto.commit.enable": "false" curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance_2", "auto.commit.enable": "false", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:28082/consumers/my_json_consumer printf "\n" # subscribe curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["jsontest"]}' \ http://localhost:28082/consumers/my_json_consumer/instances/my_consumer_instance_2/subscription
- 7/ Consumer 2 DOES NOT read records
- It just hung and does not give any answer for long time (~5 mins).
- It seems like the
first
kafka instance locked (assigned) all topic partitions andsecond
one waits. - There is a problem with scalability, if we have multiple Kakfa REST proxies it does not bring value.
- And it looks like Kakfa REST Proxy only vertical scalable now.
# consume 2 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:28082/consumers/my_json_consumer/instances/my_consumer_instance_2/records?max_bytes=10
-
Opinion
-
I guess it could be wrong
kafka + kafka rest
configuration from my side that leads to behaviour described before. -
From my observations KAFKA Rest
consumer instance 1
reads records / messages from multiple partitions, it means that simple consumers (kafka clients) "take" partitions and the secondconsumer instance 2
does not have ability read messages because all partitions are "busy". -
When I delete
consumer instance 1
second consumerconsumer instance 2
works as expected. -
Questions
-
If I am wrong with
kafka or/and kafka rest
configuration could you suggest or correct this one to fix the issue? -
If it's the issue: What information can I add to easily reproduce a case or help?