Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit e8ebcbc

Browse files
Moved all test classes into same folder
1 parent d82df6e commit e8ebcbc

8 files changed

+229
-190
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.github.megachucky.kafka.streams.machinelearning;
2+
3+
import org.apache.kafka.clients.producer.ProducerRecord;
4+
import org.apache.kafka.common.serialization.StringDeserializer;
5+
import org.apache.kafka.common.serialization.StringSerializer;
6+
import org.apache.kafka.streams.KeyValue;
7+
import org.apache.kafka.streams.TopologyTestDriver;
8+
import org.apache.kafka.streams.test.ConsumerRecordFactory;
9+
import org.junit.After;
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
13+
import java.util.Arrays;
14+
import java.util.List;
15+
import java.util.stream.Collectors;
16+
17+
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
18+
19+
/**
20+
* TopologyTestDriver based test about stream processing of
21+
* Kafka_Streams_TensorFlow_Image_Recognition_Example.
22+
*
23+
* @author Jukka Karvanen / jukinimi.com * Unit Test of
24+
* {@link Kafka_Streams_MachineLearning_H2O_DeepLearning_Example}, using
25+
* an TopologyTestDriver and a H2O DeepLearning model.
26+
*
27+
*/
28+
29+
public class Kafka_Streams_MachineLearning_H2O_DeepLearning_ExampleTest {
30+
private TopologyTestDriver testDriver;
31+
32+
private StringDeserializer stringDeserializer = new StringDeserializer();
33+
private ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(
34+
new StringSerializer(), new StringSerializer());
35+
36+
@Before
37+
public void setup() throws IllegalAccessException, ClassNotFoundException, InstantiationException {
38+
testDriver = new TopologyTestDriver(
39+
Kafka_Streams_MachineLearning_H2O_DeepLearning_Example.getStreamTopology(
40+
Kafka_Streams_MachineLearning_H2O_DeepLearning_Example.modelClassName),
41+
Kafka_Streams_MachineLearning_H2O_DeepLearning_Example.getStreamConfiguration(
42+
"localhost:9092",
43+
Kafka_Streams_MachineLearning_H2O_DeepLearning_Example.APPLICATION_ID));
44+
}
45+
46+
@After
47+
public void tearDown() {
48+
try {
49+
testDriver.close();
50+
} catch (RuntimeException e) {
51+
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when
52+
// executed in Windows, ignoring it
53+
// Logged stacktrace cannot be avoided
54+
System.out.println("Ignoring exception, test failing in Windows due this exception:"
55+
+ e.getLocalizedMessage());
56+
}
57+
}
58+
59+
private String getOutput() {
60+
ProducerRecord<String, String> output = testDriver.readOutput(
61+
Kafka_Streams_MachineLearning_H2O_DeepLearning_Example.OUTPUT_TOPIC, stringDeserializer,
62+
stringDeserializer);
63+
assertThat(output).isNotNull();
64+
return output.value();
65+
}
66+
67+
/**
68+
* Simple test validating only the prediction part of the output
69+
*/
70+
@Test
71+
public void testOne() {
72+
testDriver.pipeInput(recordFactory.create(
73+
Kafka_Streams_MachineLearning_H2O_DeepLearning_Example.INPUT_TOPIC, null,
74+
"1987,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES",
75+
1L));
76+
assertThat(getOutput()).isEqualTo("Prediction: Is Airline delayed? => YES");
77+
}
78+
79+
/**
80+
* Test based on
81+
* Kafka_Streams_TensorFlow_Image_Recognition_Example_IntegrationTest
82+
*
83+
*/
84+
@Test
85+
public void testList() {
86+
// Flight data (one single flight) --> We want to predict if it will be
87+
// delayed or not
88+
List<String> inputValues = Arrays.asList(
89+
"1987,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES",
90+
"1999,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES");
91+
List<KeyValue<String, String>> records = inputValues.stream()
92+
.map(v -> new KeyValue<String, String>(null, v)).collect(Collectors.toList());
93+
94+
testDriver.pipeInput(recordFactory.create(
95+
Kafka_Streams_MachineLearning_H2O_DeepLearning_Example.INPUT_TOPIC, records, 1L, 100L));
96+
assertThat(getOutput()).isEqualTo("Prediction: Is Airline delayed? => YES");
97+
// This model predict also another flight to be delayed
98+
assertThat(getOutput()).isEqualTo("Prediction: Is Airline delayed? => YES");
99+
}
100+
101+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
package com.github.megachucky.kafka.streams.machinelearning.test;
1+
package com.github.megachucky.kafka.streams.machinelearning;
22

33
import static org.assertj.core.api.Assertions.assertThat;
44

55
import java.util.Arrays;
66
import java.util.List;
77
import java.util.Properties;
88

9-
import com.github.jukkakarvanen.kafka.streams.integration.utils.TestEmbeddedKafkaCluster;
9+
import com.github.megachucky.kafka.streams.machinelearning.TestEmbeddedKafkaCluster;
1010
import org.apache.kafka.clients.consumer.ConsumerConfig;
1111
import org.apache.kafka.clients.producer.ProducerConfig;
1212
import org.apache.kafka.common.serialization.Serdes;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.github.megachucky.kafka.streams.machinelearning;
2+
3+
import org.apache.kafka.clients.producer.ProducerRecord;
4+
import org.apache.kafka.common.serialization.StringDeserializer;
5+
import org.apache.kafka.common.serialization.StringSerializer;
6+
import org.apache.kafka.streams.KeyValue;
7+
import org.apache.kafka.streams.TopologyTestDriver;
8+
import org.apache.kafka.streams.test.ConsumerRecordFactory;
9+
import org.junit.After;
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
13+
import java.io.IOException;
14+
import java.util.Arrays;
15+
import java.util.List;
16+
import java.util.stream.Collectors;
17+
18+
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
19+
20+
/**
21+
* TopologyTestDriver based test about stream processing of
22+
* Kafka_Streams_TensorFlow_Image_Recognition_Example.
23+
*
24+
* @author Jukka Karvanen / jukinimi.com * Unit Test of
25+
* {@link Kafka_Streams_MachineLearning_H2O_GBM_Example}, using an
26+
* TopologyTestDriver and a H2O GBM model.
27+
*
28+
*/
29+
30+
public class Kafka_Streams_MachineLearning_H2O_GBM_ExampleTest {
31+
private TopologyTestDriver testDriver;
32+
33+
private StringDeserializer stringDeserializer = new StringDeserializer();
34+
private ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(
35+
new StringSerializer(), new StringSerializer());
36+
37+
@Before
38+
public void setup() throws IllegalAccessException, ClassNotFoundException, InstantiationException {
39+
testDriver = new TopologyTestDriver(
40+
Kafka_Streams_MachineLearning_H2O_GBM_Example.getStreamTopology(
41+
Kafka_Streams_MachineLearning_H2O_GBM_Example.modelClassName),
42+
Kafka_Streams_MachineLearning_H2O_GBM_Example.getStreamConfiguration("localhost:9092",
43+
Kafka_Streams_MachineLearning_H2O_GBM_Example.APPLICATION_ID));
44+
}
45+
46+
@After
47+
public void tearDown() {
48+
try {
49+
testDriver.close();
50+
} catch (RuntimeException e) {
51+
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when
52+
// executed in Windows, ignoring it
53+
// Logged stacktrace cannot be avoided
54+
System.out.println("Ignoring exception, test failing in Windows due this exception:"
55+
+ e.getLocalizedMessage());
56+
}
57+
}
58+
59+
private String getOutput() {
60+
ProducerRecord<String, String> output = testDriver.readOutput(
61+
Kafka_Streams_MachineLearning_H2O_GBM_Example.OUTPUT_TOPIC, stringDeserializer,
62+
stringDeserializer);
63+
assertThat(output).isNotNull();
64+
return output.value();
65+
}
66+
67+
/**
68+
* Simple test validating only the prediction part of the output
69+
*/
70+
@Test
71+
public void testOne() {
72+
testDriver.pipeInput(recordFactory.create(Kafka_Streams_MachineLearning_H2O_GBM_Example.INPUT_TOPIC,
73+
null,
74+
"1987,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES",
75+
1L));
76+
assertThat(getOutput()).isEqualTo("Prediction: Is Airline delayed? => YES");
77+
}
78+
79+
/**
80+
* Test based on
81+
* Kafka_Streams_TensorFlow_Image_Recognition_Example_IntegrationTest
82+
*
83+
*/
84+
@Test
85+
public void testList() {
86+
// Flight data (one single flight) --> We want to predict if it will be
87+
// delayed or not
88+
List<String> inputValues = Arrays.asList(
89+
"1987,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES",
90+
"1999,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES");
91+
List<KeyValue<String, String>> records = inputValues.stream()
92+
.map(v -> new KeyValue<String, String>(null, v)).collect(Collectors.toList());
93+
94+
testDriver.pipeInput(recordFactory.create(Kafka_Streams_MachineLearning_H2O_GBM_Example.INPUT_TOPIC,
95+
records, 1L, 100L));
96+
assertThat(getOutput()).isEqualTo("Prediction: Is Airline delayed? => YES");
97+
assertThat(getOutput()).isEqualTo("Prediction: Is Airline delayed? => NO");
98+
}
99+
100+
}
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
package com.github.megachucky.kafka.streams.machinelearning.test;
1+
package com.github.megachucky.kafka.streams.machinelearning;
22

33
import static org.assertj.core.api.Assertions.assertThat;
44

55
import java.util.Arrays;
66
import java.util.List;
77
import java.util.Properties;
88

9-
import com.github.jukkakarvanen.kafka.streams.integration.utils.TestEmbeddedKafkaCluster;
9+
import com.github.megachucky.kafka.streams.machinelearning.TestEmbeddedKafkaCluster;
1010
import org.apache.kafka.clients.consumer.ConsumerConfig;
1111
import org.apache.kafka.clients.producer.ProducerConfig;
1212
import org.apache.kafka.common.serialization.Serdes;
@@ -47,8 +47,9 @@
4747
public class Kafka_Streams_MachineLearning_H2O_GBM_Example_IntegrationTest {
4848

4949
@ClassRule
50-
// public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
51-
50+
// public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new
51+
// EmbeddedSingleNodeKafkaCluster();
52+
5253
public static final EmbeddedKafkaCluster CLUSTER = new TestEmbeddedKafkaCluster(1);
5354

5455
private static final String inputTopic = "AirlineInputTopic";
@@ -184,7 +185,8 @@ public void shouldPredictFlightDelay() throws Exception {
184185
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
185186
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
186187
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
187-
IntegrationTestUtils.<String>produceValuesSynchronously(inputTopic, inputValues, producerConfig, new MockTime());
188+
IntegrationTestUtils.<String>produceValuesSynchronously(inputTopic, inputValues, producerConfig,
189+
new MockTime());
188190

189191
//
190192
// Step 3: Verify the application's output data.
Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
1-
package com.github.jukkakarvanen.kafka.streams.integration.utils;
1+
package com.github.megachucky.kafka.streams.machinelearning;
22

33
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66

77
import java.util.Properties;
88

9-
/** This is helper class to workaround for Failing stream tests in Windows environment KAFKA-6647.
9+
/**
10+
* This is helper class to workaround for Failing stream tests in Windows
11+
* environment KAFKA-6647.
1012
*
1113
* @author Jukka Karvanen
1214
*
13-
* The causing issue is https://issues.apache.org/jira/browse/KAFKA-6647
14-
* Replacing EmbeddedKafkaCluster with TestEmbeddedKafkaCluster will catch and ignore the exception
15-
* happening during the tear down of the test
16-
* The exception does not have affect to functionality
17-
*/
15+
* The causing issue is https://issues.apache.org/jira/browse/KAFKA-6647
16+
* Replacing EmbeddedKafkaCluster with TestEmbeddedKafkaCluster will
17+
* catch and ignore the exception happening during the tear down of the
18+
* test The exception does not have affect to functionality
19+
*/
1820

1921
public class TestEmbeddedKafkaCluster extends EmbeddedKafkaCluster {
2022
private static final Logger log = LoggerFactory.getLogger(TestEmbeddedKafkaCluster.class);
@@ -31,7 +33,8 @@ public TestEmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, long mo
3133
super(numBrokers, brokerConfig, mockTimeMillisStart);
3234
}
3335

34-
public TestEmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, long mockTimeMillisStart, long mockTimeNanoStart) {
36+
public TestEmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, long mockTimeMillisStart,
37+
long mockTimeNanoStart) {
3538
super(numBrokers, brokerConfig, mockTimeMillisStart, mockTimeNanoStart);
3639
}
3740

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.jukkakarvanen.kafka.streams.integration.utils;
1+
package com.github.megachucky.kafka.streams.machinelearning;
22

33
import org.apache.kafka.common.utils.Time;
44
import org.apache.kafka.streams.KafkaClientSupplier;
@@ -9,13 +9,16 @@
99

1010
import java.util.Properties;
1111

12-
/** This is helper class to workaround for Failing stream tests in Windows environment KAFKA-6647.
12+
/**
13+
* This is helper class to workaround for Failing stream tests in Windows
14+
* environment KAFKA-6647.
1315
*
1416
* @author Jukka Karvanen
1517
*
16-
* The causing issue is https://issues.apache.org/jira/browse/KAFKA-6647
17-
* Replacing KafkaStreams with TestKafkaStreams will catch and ignore the exception caused by cleanUp
18-
* The exception does not have affect to functionality
18+
* The causing issue is https://issues.apache.org/jira/browse/KAFKA-6647
19+
* Replacing KafkaStreams with TestKafkaStreams will catch and ignore
20+
* the exception caused by cleanUp The exception does not have affect to
21+
* functionality
1922
*/
2023

2124
public class TestKafkaStreams extends KafkaStreams {

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /