Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit b666dbc

Browse files
Merge branch 'static_bug' of https://github.com/jukkakarvanen/kafka-streams-machine-learning-examples into jukkakarvanen-static_bug
Merge fix for 'static bug' (using static keyword for a variable) which does not allow to scale with more Threads.
2 parents f8545ff + 5974142 commit b666dbc

File tree

1 file changed

+11
-16
lines changed

1 file changed

+11
-16
lines changed

‎tensorflow-image-recognition/src/main/java/com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.kafka.streams.StreamsConfig;
1616
import org.apache.kafka.streams.Topology;
1717
import org.apache.kafka.streams.kstream.KStream;
18+
import org.apache.kafka.streams.kstream.Printed;
1819
import org.tensorflow.DataType;
1920
import org.tensorflow.Graph;
2021
import org.tensorflow.Output;
@@ -35,10 +36,6 @@ public class Kafka_Streams_TensorFlow_Image_Recognition_Example {
3536
static final String imageInputTopic = "ImageInputTopic";
3637
static final String imageOutputTopic = "ImageOutputTopic";
3738

38-
// Prediction Value
39-
private static String imageClassification = "unknown";
40-
private static String imageProbability = "unknown";
41-
4239
public static void main(final String[] args) throws Exception {
4340
// Configure Kafka Streams Application
4441
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
@@ -99,12 +96,16 @@ static Topology getStreamTopology() throws IOException {
9996
// message values represent lines of text
10097
final KStream<String, String> imageInputLines = builder.stream(imageInputTopic);
10198

102-
// Stream Processor (in this case 'foreach' to add custom logic, i.e. apply the
99+
//imageInputLines.print(Printed.toSysOut());
100+
101+
// Stream Processor (in this case inside mapValues to add custom logic, i.e. apply the
103102
// analytic model)
104-
imageInputLines.foreach((key, value) -> {
103+
// Transform message: Add prediction information
104+
KStream<String, Object> transformedMessage =
105+
imageInputLines.mapValues(value -> {
105106

106-
imageClassification = "unknown";
107-
imageProbability = "unknown";
107+
StringimageClassification = "unknown";
108+
StringimageProbability = "unknown";
108109

109110
String imageFile = value;
110111

@@ -128,16 +129,10 @@ static Topology getStreamTopology() throws IOException {
128129
} catch (IOException e) {
129130
e.printStackTrace();
130131
}
131-
132+
return "Prediction: What is the content of this picture? => " + imageClassification
133+
+ ", probability = " + imageProbability;
132134
});
133135

134-
// airlineInputLines.print();
135-
136-
// Transform message: Add prediction information
137-
KStream<String, Object> transformedMessage = imageInputLines
138-
.mapValues(value -> "Prediction: What is the content of this picture? => " + imageClassification
139-
+ ", probability = " + imageProbability);
140-
141136
// Send prediction information to Output Topic
142137
transformedMessage.to(imageOutputTopic);
143138

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /