0

Using kafka version is 2.7.0 for application

Try with kafka stream maven version 2.2.1 kafka stream in pom.xml , then stream is started and getting expected output.

But when updating to maven version 2.3.0(or above) in pom.xml, then getting below error from logs and stream is not starting

Exception in thread "Average-3ded0155-d697-492b-897b-4da5bfec5cf1-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-REDUCE-STATE-STORE-0000000005 at location /kafka/Average/statedir/Average/1_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000005
 at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87)
 at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:185)
 at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
 at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
 at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
 at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
 at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
 at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
 at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init1ドル(MeteredKeyValueStore.java:120)
 at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
 at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
 at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
 at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:209)
 at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
 at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728)
 at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: org.rocksdb.RocksDBException: Column family not found: keyValueWithTimestamp
 at org.rocksdb.RocksDB.open(Native Method)
 at org.rocksdb.RocksDB.open(RocksDB.java:306)
 at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75)
Anton Pomieshchenko
2,1972 gold badges14 silver badges26 bronze badges
asked Feb 7, 2021 at 5:11

1 Answer 1

0

I've got the same error/root cause when I include "kafka-streams" and "rocksdbjni" as dependencies in my pom.xml file. After removing the later one, the error vanishes. My aim was to create a jar file with dependencies and use it in a docker container. I included "rocksdbjni" since some dependency errors occured during the development. Finaly I found out that they originated from the used docker image and not from the pom.xml.

Exception in thread "streams-window-sum6-b7f79924-c195-428f-bc02-c7d61d7a0f7d-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: Error opening store aggregate-state-store1.1606723200000 at location /tmp/kafka-streams/streams-window-sum6/0_0/aggregate-state-store1/aggregate-state-store1.1606723200000
 at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:133)
 at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:148)
 at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:147)
 at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process2ドル(ProcessorNode.java:142)
 at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
 at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
 at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
 at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
 at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
 at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
 at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process1ドル(StreamTask.java:679)
 at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
 at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
 at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store aggregate-state-store1.1606723200000 at location /tmp/kafka-streams/streams-window-sum6/0_0/aggregate-state-store1/aggregate-state-store1.1606723200000
 at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87)
 at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
 at org.apache.kafka.streams.state.internals.TimestampedSegment.openDB(TimestampedSegment.java:49)
 at org.apache.kafka.streams.state.internals.TimestampedSegments.getOrCreateSegment(TimestampedSegments.java:50)
 at org.apache.kafka.streams.state.internals.TimestampedSegments.getOrCreateSegment(TimestampedSegments.java:25)
 at org.apache.kafka.streams.state.internals.AbstractSegments.getOrCreateSegmentIfLive(AbstractSegments.java:84)
 at org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.put(AbstractRocksDBSegmentedBytesStore.java:142)
 at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:62)
 at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:27)
 at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:102)
 at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:32)
 at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put2ドル(MeteredWindowStore.java:127)
 at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
 at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:126)
 ... 16 more
Caused by: org.rocksdb.RocksDBException: Column family not found: keyValueWithTimestamp
 at org.rocksdb.RocksDB.open(Native Method)
 at org.rocksdb.RocksDB.open(RocksDB.java:306)
 at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75)
 ... 29 more

Here is my pom.xml file based on the official documentation (https://kafka.apache.org/27/documentation/streams/tutorial) which works with mvn exec:java -Dexec.mainClass=myapps.MyClass as well as with the jar java -cp app.jar myapps.MyClass.

<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>streams.examples</groupId>
 <artifactId>streams.examples</artifactId>
 <version>0.2</version>
 <packaging>jar</packaging>
 <name>Kafka Streams Quickstart :: Java</name>
 <properties>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 <kafka.version>2.6.0</kafka.version>
 <slf4j.version>1.7.7</slf4j.version>
 <log4j.version>1.2.17</log4j.version>
 </properties>
 <repositories>
 <repository>
 <id>apache.snapshots</id>
 <name>Apache Development Snapshot Repository</name>
 <url>https://repository.apache.org/content/repositories/snapshots/</url>
 <releases>
 <enabled>false</enabled>
 </releases>
 <snapshots>
 <enabled>true</enabled>
 </snapshots>
 </repository>
 </repositories>
 <!--
 Execute "mvn clean package -Pbuild-jar"
 to build a jar file out of this project!
 -->
 <build>
 <plugins>
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-compiler-plugin</artifactId>
 <version>3.7.0</version>
 <configuration>
 <source>11</source>
 <target>11</target>
 </configuration>
 </plugin>
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <version>3.2.4</version>
 <executions>
 <execution>
 <phase>package</phase>
 <goals>
 <goal>shade</goal>
 </goals>
 <configuration>
 <filters>
 <!-- This filter is to workaround the problem caused by included signed jars.
 java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
 -->
 <filter>
 <artifact>*:*</artifact>
 <excludes>
 <exclude>META-INF/*.SF</exclude>
 <exclude>META-INF/*.DSA</exclude>
 <exclude>META-INF/*.RSA</exclude>
 </excludes>
 </filter>
 </filters>
 <transformers>
 <transformer
 implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
 <mainClass>myapps.MyClass</mainClass>
 </transformer>
 </transformers>
 </configuration>
 </execution>
 </executions>
 </plugin>
 </plugins>
 </build>
 <dependencies>
 <!-- Apache Kafka dependencies -->
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-streams</artifactId>
 <version>${kafka.version}</version>
 <scope>compile</scope>
 </dependency>
 </dependencies>
</project>
answered Feb 7, 2021 at 12:52
Sign up to request clarification or add additional context in comments.

1 Comment

But what exactly is the problem? I use also a local version of rocksdbjni for Mac M1 support.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.