Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 29b78b5

Browse files
NeQuissimusmanub
authored andcommitted
Kafka 2.0.0 (#154)
1 parent a5223a9 commit 29b78b5

File tree

14 files changed

+61
-75
lines changed

14 files changed

+61
-75
lines changed

‎.travis.yml‎

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
language: scala
22

3-
jdk:
3+
jdk:
44
- oraclejdk8
55

66
scala:
@@ -19,6 +19,5 @@ before_cache:
1919
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
2020
- find $HOME/.sbt -name "*.lock" -print -delete
2121

22-
script:
22+
script:
2323
- travis_retry sbt ++$TRAVIS_SCALA_VERSION test
24-

‎build.sbt‎

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import sbtrelease.Version
22

33
parallelExecution in ThisBuild := false
44

5-
val kafkaVersion = "1.1.1"
6-
val confluentVersion = "4.1.1"
5+
val kafkaVersion = "2.0.0"
6+
val confluentVersion = "5.0.0"
77
val akkaVersion = "2.5.14"
88

99
lazy val commonSettings = Seq(
@@ -22,6 +22,7 @@ lazy val commonSettings = Seq(
2222
lazy val commonLibrarySettings = libraryDependencies ++= Seq(
2323
"org.apache.avro" % "avro" % "1.8.2",
2424
"org.apache.kafka" %% "kafka" % kafkaVersion,
25+
"org.slf4j" % "slf4j-log4j12" % "1.7.25" % Test,
2526
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
2627
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Test,
2728
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test
@@ -53,6 +54,13 @@ lazy val releaseSettings = Seq(
5354
releaseCrossBuild := true
5455
)
5556

57+
// https://github.com/sbt/sbt/issues/3618
58+
// [error] (kafkaStreams / update) sbt.librarymanagement.ResolveException: download failed: javax.ws.rs#javax.ws.rs-api;2.1!javax.ws.rs-api.${packaging.type}
59+
val workaround = {
60+
sys.props += "packaging.type" -> "jar"
61+
()
62+
}
63+
5664
lazy val root = (project in file("."))
5765
.settings(name := "scalatest-embedded-kafka-root")
5866
.settings(commonSettings: _*)

‎embedded-kafka/src/main/scala/net/manub/embeddedkafka/Codecs.scala‎

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
package net.manub.embeddedkafka
22

3-
import kafka.serializer._
43
import org.apache.kafka.clients.consumer.ConsumerRecord
54
import org.apache.kafka.common.serialization._
65

76
/** useful encoders/serializers, decoders/deserializers and [[ConsumerRecord]] decoders**/
87
object Codecs {
9-
implicit val stringEncoder: Encoder[String] = new StringEncoder()
10-
implicit val nullEncoder: Encoder[Array[Byte]] = new DefaultEncoder()
118
implicit val stringSerializer: Serializer[String] = new StringSerializer()
129
implicit val nullSerializer: Serializer[Array[Byte]] =
1310
new ByteArraySerializer()
1411

15-
implicit val stringDecoder: Decoder[String] = new StringDecoder()
16-
implicit val nullDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
1712
implicit val stringDeserializer: Deserializer[String] =
1813
new StringDeserializer()
1914
implicit val nullDeserializer: Deserializer[Array[Byte]] =

‎embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ object ConsumerExtensions {
4848
import scala.collection.JavaConverters._
4949
consumer.subscribe(topics.asJava)
5050
topics.foreach(consumer.partitionsFor)
51-
val records = consumer.poll(poll)
51+
val records = consumer.poll(java.time.Duration.ofMillis(poll))
5252
// use toList to force eager evaluation. toSeq is lazy
5353
records.iterator().asScala.toList.map(decoder(_))
5454
}.recover {

‎embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] {
588588
topics.foreach(consumer.partitionsFor)
589589

590590
while (messagesRead < number && System.nanoTime < timeoutNanoTime) {
591-
val records = consumer.poll(1000)
591+
val records = consumer.poll(java.time.Duration.ofMillis(1000))
592592
val recordIter = records.iterator()
593593
if (resetTimeoutOnEachMessage && recordIter.hasNext) {
594594
timeoutNanoTime = System.nanoTime + timeout.toNanos

‎embedded-kafka/src/main/scala/net/manub/embeddedkafka/avro/avroMarshallers.scala‎

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package net.manub.embeddedkafka.avro
22

33
import java.io.ByteArrayOutputStream
44

5-
import kafka.serializer.{Decoder, Encoder}
65
import kafka.utils.VerifiableProperties
76
import org.apache.avro.Schema
87
import org.apache.avro.io._
@@ -13,58 +12,39 @@ import org.apache.avro.specific.{
1312
}
1413
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
1514

16-
class KafkaAvroDecoder[T <: SpecificRecord](schema: Schema,
17-
props: VerifiableProperties=null)
18-
extendsDecoder[T] {
19-
privatevalNoInstanceReuse=null.asInstanceOf[T]
20-
privatevalNoDecoderReuse=null.asInstanceOf[BinaryDecoder]
15+
class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
16+
extendsDeserializer[T]
17+
withNoOpConfiguration
18+
withNoOpClose {
19+
2120
private val reader = new SpecificDatumReader[T](schema)
2221

23-
override def fromBytes(bytes: Array[Byte]): T = {
24-
val decoder = DecoderFactory.get().binaryDecoder(bytes, NoDecoderReuse)
25-
reader.read(NoInstanceReuse, decoder)
22+
override def deserialize(topic: String, data: Array[Byte]): T = {
23+
val decoder = DecoderFactory.get().binaryDecoder(data, null)
24+
reader.read(null.asInstanceOf[T], decoder)
2625
}
2726
}
2827

29-
class KafkaAvroEncoder[T <: SpecificRecord](props: VerifiableProperties = null)
30-
extends Encoder[T] {
31-
private val NoEncoderReuse = null.asInstanceOf[BinaryEncoder]
28+
class KafkaAvroSerializer[T <: SpecificRecord]()
29+
extends Serializer[T]
30+
with NoOpConfiguration
31+
with NoOpClose {
3232

33-
override def toBytes(nullableData: T): Array[Byte] = {
33+
private def toBytes(nullableData: T): Array[Byte] =
3434
Option(nullableData).fold[Array[Byte]](null) { data =>
3535
val writer: DatumWriter[T] = new SpecificDatumWriter[T](data.getSchema)
3636
val out = new ByteArrayOutputStream()
37-
val encoder = EncoderFactory.get.binaryEncoder(out, NoEncoderReuse)
37+
val encoder = EncoderFactory.get.binaryEncoder(out, null)
3838

3939
writer.write(data, encoder)
4040
encoder.flush()
4141
out.close()
4242

4343
out.toByteArray
4444
}
45-
}
46-
}
47-
48-
class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
49-
extends Deserializer[T]
50-
with NoOpConfiguration
51-
with NoOpClose {
52-
53-
private val decoder = new KafkaAvroDecoder[T](schema = schema)
54-
55-
override def deserialize(topic: String, data: Array[Byte]): T =
56-
decoder.fromBytes(data)
57-
}
58-
59-
class KafkaAvroSerializer[T <: SpecificRecord]()
60-
extends Serializer[T]
61-
with NoOpConfiguration
62-
with NoOpClose {
63-
64-
private val encoder = new KafkaAvroEncoder[T]()
6545

6646
override def serialize(topic: String, data: T): Array[Byte] =
67-
encoder.toBytes(data)
47+
toBytes(data)
6848
}
6949

7050
sealed trait NoOpConfiguration {
Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,15 @@
11
package net.manub.embeddedkafka
22

3-
import kafka.serializer.{Decoder, Encoder}
43
import kafka.utils.VerifiableProperties
54
import org.apache.avro.Schema
65
import org.apache.avro.specific.SpecificRecord
76
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
87

98
package object avro {
10-
119
implicit def specificAvroSerializer[T <: SpecificRecord]: Serializer[T] =
1210
new KafkaAvroSerializer[T]
13-
implicit def specificAvroEncoder[T <: SpecificRecord]: Encoder[T] =
14-
new KafkaAvroEncoder[T]
1511

1612
def specificAvroDeserializer[T <: SpecificRecord](
1713
schema: Schema): Deserializer[T] =
1814
new KafkaAvroDeserializer[T](schema)
19-
20-
def specificAvroDecoder[T <: SpecificRecord](schema: Schema,
21-
props: VerifiableProperties =
22-
null): Decoder[T] =
23-
new KafkaAvroDecoder[T](schema, props)
2415
}

‎embedded-kafka/src/test/scala/net/manub/embeddedkafka/ConsumerExtensionsSpec.scala‎

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ class ConsumerExtensionsSpec
3030
.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]]
3131
.asJava)
3232

33-
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
33+
when(consumer.poll(java.time.Duration.ofMillis(retryConf.poll)))
34+
.thenReturn(consumerRecords)
3435

3536
consumer.consumeLazily[String]("topic")
3637

37-
verify(consumer, times(retryConf.maximumAttempts)).poll(retryConf.poll)
38+
verify(consumer, times(retryConf.maximumAttempts))
39+
.poll(java.time.Duration.ofMillis(retryConf.poll))
3840
}
3941

4042
"not retry to get messages with the configured maximum number of attempts when poll succeeds" in {
@@ -48,11 +50,12 @@ class ConsumerExtensionsSpec
4850
new TopicPartition("topic", 1) -> List(consumerRecord).asJava).asJava
4951
)
5052

51-
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
53+
when(consumer.poll(java.time.Duration.ofMillis(retryConf.poll)))
54+
.thenReturn(consumerRecords)
5255

5356
consumer.consumeLazily[String]("topic")
5457

55-
verify(consumer).poll(retryConf.poll)
58+
verify(consumer).poll(java.time.Duration.ofMillis(retryConf.poll))
5659
}
5760

5861
"poll to get messages with the configured poll timeout" in {
@@ -65,11 +68,12 @@ class ConsumerExtensionsSpec
6568
.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]]
6669
.asJava)
6770

68-
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
71+
when(consumer.poll(java.time.Duration.ofMillis(retryConf.poll)))
72+
.thenReturn(consumerRecords)
6973

7074
consumer.consumeLazily[String]("topic")
7175

72-
verify(consumer).poll(retryConf.poll)
76+
verify(consumer).poll(java.time.Duration.ofMillis(retryConf.poll))
7377
}
7478
}
7579

‎embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala‎

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class EmbeddedKafkaMethodsSpec
5151
val consumer = kafkaConsumer
5252
consumer.subscribe(List(topic).asJava)
5353

54-
val records = consumer.poll(consumerPollTimeout)
54+
val records =
55+
consumer.poll(java.time.Duration.ofMillis(consumerPollTimeout))
5556

5657
records.iterator().hasNext shouldBe true
5758
val record = records.iterator().next()
@@ -77,7 +78,8 @@ class EmbeddedKafkaMethodsSpec
7778
val consumer = kafkaConsumer
7879
consumer.subscribe(List(topic).asJava)
7980

80-
val records = consumer.poll(consumerPollTimeout)
81+
val records =
82+
consumer.poll(java.time.Duration.ofMillis(consumerPollTimeout))
8183

8284
records.iterator().hasNext shouldBe true
8385
val record = records.iterator().next()
@@ -102,7 +104,8 @@ class EmbeddedKafkaMethodsSpec
102104
val consumer = kafkaConsumer
103105
consumer.subscribe(List(topic).asJava)
104106

105-
val records = consumer.poll(consumerPollTimeout)
107+
val records =
108+
consumer.poll(java.time.Duration.ofMillis(consumerPollTimeout))
106109

107110
records.iterator().hasNext shouldBe true
108111
val record = records.iterator().next()
@@ -129,7 +132,9 @@ class EmbeddedKafkaMethodsSpec
129132
val consumer = kafkaConsumer
130133
consumer.subscribe(List(topic).asJava)
131134

132-
val records = consumer.poll(consumerPollTimeout).iterator()
135+
val records = consumer
136+
.poll(java.time.Duration.ofMillis(consumerPollTimeout))
137+
.iterator()
133138

134139
records.hasNext shouldBe true
135140

‎embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaObjectSpec.scala‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
110110
kafkaConsumer(someOtherConfig, deserializer, deserializer)
111111
anotherConsumer.subscribe(List(topic).asJava)
112112

113-
val moreRecords = anotherConsumer.poll(consumerPollTimeout)
113+
val moreRecords =
114+
anotherConsumer.poll(java.time.Duration.ofMillis(consumerPollTimeout))
114115
moreRecords.count shouldBe 1
115116

116117
val someOtherRecord = moreRecords.iterator().next

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /