Cases describe issues related to "auto.commit.enable": "false" and scalability.
First KAFKA REST Proxy 1 locks Second KAFKA REST Proxy 2.
 +---------------------+
 | CURL produce |
 | topic: jsontest |
 +----------+----------+
 | [ok] produce 10 records
 |
 +-------------------+ | +-------------------+
 | CURL consumer 1 | | | CURL consumer 2 |
 | | | | |
 +-------+-----------+ | +------+------------+
[ok] create consumer | | | [ok] create consumer
[ok] subscribe | | | [ok] subscribe
[ok] consume records | | | [hung] consume records
 | | |
 +-----v-------+ | +----v--------+
 | Kafka REST <--------+ | Kafka REST |
 | port:18082 | | port:28082 |
 +------+------+ +------+------+
 | |
 | |
 | |
 +--------v----------------------------v------------+
 | Kafka |
 | port:9092 |
 +----------------+---------------------------------+
 |
 +----------------v---------------------------------+
 | Zookeeper |
 | port:2181 |
 +--------------------------------------------------+
- 1/ Start services
docker-compose.yml
version: "3.5" services: zookeeper: image: confluentinc/cp-zookeeper:5.0.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:5.0.0 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 kafka-rest-1: image: confluentinc/cp-kafka-rest:5.0.0 depends_on: - kafka ports: - 18082:8082 environment: KAFKA_REST_ID: "1" KAFKA_REST_HOST_NAME: kafka-rest-1 KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:9092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_PRODUCER_THREADS: "10" kafka-rest-2: image: confluentinc/cp-kafka-rest:5.0.0 depends_on: - kafka ports: - 28082:8082 environment: KAFKA_REST_ID: "2" KAFKA_REST_HOST_NAME: kafka-rest-2 KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:9092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_PRODUCER_THREADS: "10"
- 
Start services. docker-compose up
- 
2/ Create topic with partitions 
- 
topic + 10 partitions: docker-compose exec kafka bash -c "kafka-topics --zookeeper zookeeper:2181 --topic jsontest --create --partitions 10 --replication-factor 1"
- 
describe to be sure: docker-compose exec kafka bash -c "kafka-topics --zookeeper zookeeper:2181 --topic jsontest --describe"
- 
3/ produce records 
- 
10 simple records: produce 10 records: {v: 0} ... {v: 9} 
- 
KAFKA REST Firstport18082
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --data '{"records":[{"value":{"v":"0"}}, {"value":{"v":"1"}}, {"value":{"v":"2"}}, {"value":{"v":"3"}}, {"value":{"v":"4"}}, {"value":{"v":"5"}}, {"value":{"v":"6"}}, {"value":{"v":"7"}}, {"value":{"v":"8"}}, {"value":{"v":"9"}}]}' \ "http://localhost:18082/topics/jsontest"
- 4/ Create CURL consumer 1
- It creates consumer instance and subscribe topic jsontest. Kafka REST 1 port:18082
# create consumer 1 # "auto.commit.enable": "false" curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance_1", "auto.commit.enable": "false", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:18082/consumers/my_json_consumer printf "\n" # subscribe curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["jsontest"]}' \ http://localhost:18082/consumers/my_json_consumer/instances/my_consumer_instance_1/subscription
- 5/ Consumer 1 reads records
- It consumes from Kafka REST 1 port 18082
# consume 1 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:18082/consumers/my_json_consumer/instances/my_consumer_instance_1/records?max_bytes=10
- Results looks like:
[{"topic":"jsontest","key":null,"value":{"v":"3"},"partition":4,"offset":0}]
[{"topic":"jsontest","key":null,"value":{"v":"2"},"partition":5,"offset":0}]
[{"topic":"jsontest","key":null,"value":{"v":"8"},"partition":6,"offset":0}]
- Or it could read multiple records if max_bytes=20
## Mesages from multiple partitions [{"topic":"jsontest","key":null,"value":{"v":"3"},"partition":4,"offset":0},{"topic":"jsontest","key":null,"value":{"v":"2"},"partition":5,"offset":0}]
- 
What do you think is it ok that consumer reads multiple partitions at once? when we use "auto.commit.enable": "false". Seems it could be the issue.
- 
6/ Create CURL consumer 2 
- 
It creates consumer instance and subscribe topic jsontest. Kafka REST 2 port:28082
# create consumer 2 # "auto.commit.enable": "false" curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance_2", "auto.commit.enable": "false", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:28082/consumers/my_json_consumer printf "\n" # subscribe curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["jsontest"]}' \ http://localhost:28082/consumers/my_json_consumer/instances/my_consumer_instance_2/subscription
- 7/ Consumer 2 DOES NOT read records
- It just hung and does not give any answer for long time (~5 mins).
- It seems like the firstkafka instance locked (assigned) all topic partitions andsecondone waits.
- There is a problem with scalability, if we have multiple Kakfa REST proxies it does not bring value.
- And it looks like Kakfa REST Proxy only vertical scalable now.
# consume 2 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:28082/consumers/my_json_consumer/instances/my_consumer_instance_2/records?max_bytes=10
- 
Opinion 
- 
I guess it could be wrong kafka + kafka restconfiguration from my side that leads to behaviour described before.
- 
From my observations KAFKA Rest consumer instance 1reads records / messages from multiple partitions, it means that simple consumers (kafka clients) "take" partitions and the secondconsumer instance 2does not have ability read messages because all partitions are "busy".
- 
When I delete consumer instance 1second consumerconsumer instance 2works as expected.
- 
Questions 
- 
If I am wrong with kafka or/and kafka restconfiguration could you suggest or correct this one to fix the issue?
- 
If it's the issue: What information can I add to easily reproduce a case or help?