I have a vertx server which sends on events to all streamers an information. Using a console all working as wished. But if I use a vertx webclient the client is connected but did not receive anything.
the web client
val webClientOpt = WebClientOptions()
.setKeepAlive(true)
.setUserAgent("Client/2.0")
.setFollowRedirects(true)
.setShared(true)
the client call
client
.get(port, host, UriTemplate.of(s"${path}event-bus/"))
.putHeader("content-type", "application/json")
.bearerTokenAuthentication(UserBuffer.loggedInUser().jwtToken())
.as(BodyCodec.pipe(writeBuffer)).send()
now the writeBuffer and reader
val writeBuffer = ReactiveWriteStream.writeStream[Buffer](vertx)
val readStream = ReactiveReadStream.readStream[Buffer]()
Also the stuff to get the data
readStream.handler(j => {
println("CONSUMING!!!!")
println(j.toString("UTF-8"))
})
writeBuffer.subscribe(readStream)
I know I have to use the BodyCodec.pipe but I think here is my problem. I think I don't use it correctly.
It was not clearly to me how to use the WriteStream correctly. Now I use
new WriteStream[Buffer]() {
override def write(buffer: Buffer): io.vertx.core.Future[Void] = {
println(buffer.toString())
Future.successful(null).asVertx
}
override def end(): io.vertx.core.Future[Void] = {
Future.successful(null).asVertx
}
override def setWriteQueueMaxSize(maxSize: Int): WriteStream[Buffer] = this
override def writeQueueFull(): Boolean = false
override def drainHandler(handler: Handler[Void]): WriteStream[Buffer] = this
override def exceptionHandler(handler: Handler[Throwable]): WriteStream[Buffer] = this
}
That is what I was looking for.
-
Please clarify your specific problem or provide additional details to highlight exactly what you need. As it's currently written, it's hard to tell exactly what you're asking.Community– Community Bot2025年09月10日 22:06:46 +00:00Commented Sep 10, 2025 at 22:06
-
For clarity, is this a one-shot stream (HTTP chunked) or Server Side Events or Websockets?Gaël J– Gaël J2025年09月11日 05:25:05 +00:00Commented Sep 11, 2025 at 5:25
-
I have not understand the WriteBuffer. But now I found a way.André Schmidt– André Schmidt2025年09月11日 12:28:23 +00:00Commented Sep 11, 2025 at 12:28
-
@AndréSchmidt If you found a solution, can you please answer your own question to the benefit of future readers? :)stefanobaghino– stefanobaghino2025年09月15日 12:52:08 +00:00Commented Sep 15, 2025 at 12:52
-
@stefanobaghino yes i found a solution. I will add themAndré Schmidt– André Schmidt2025年11月12日 20:56:56 +00:00Commented Nov 12, 2025 at 20:56
1 Answer 1
Here is a solution:
private lazy val templatePipe = new WriteStream[Buffer]() {
override def write(buffer: Buffer): io.vertx.core.Future[Void] = {
val body = readFromString[TemplateEventResponse](buffer.toString())
getTemplate(body.uniqueName)
.map(_ => null).asVertx
}
override def end(): io.vertx.core.Future[Void] = {
Future.successful(null).asVertx
}
override def setWriteQueueMaxSize(maxSize: Int): WriteStream[Buffer] = this
override def writeQueueFull(): Boolean = false
override def drainHandler(handler: Handler[Void]): WriteStream[Buffer] = this
override def exceptionHandler(handler: Handler[Throwable]): WriteStream[Buffer] = this
}
Above, you see the handler and now the way to register from a client to an server.
private def updateStream(): Unit = {
client
.get(port, host, UriTemplate.of(s"${path}event-register/"))
.putHeader("content-type", "application/json")
.bearerTokenAuthentication(UserBuffer.loggedInUser().jwtToken())
.as(BodyCodec.pipe(templatePipe))
.send().asScala
.onComplete {
case Success(data) if data.statusCode() >= 200 && data.statusCode() < 300 =>
case Success(value) =>
//TODO meldung machen
case Failure(err) =>
//TODO meldung machen
}
}
If you call updateStream() this client register on a server and waits. On each event the handler will start the work. For the server I use this:
private val streamers = mutable.HashSet[HttpServerResponse]()
def registerUpdateEvent(ctx: RoutingContext): Future[?] = {
val response = ctx.response()
.putHeader("Content-Type", "text/json")
.setChunked(true)
streamers += response
response.endHandler(_ => {
streamers -= response
})
response.exceptionHandler(_ => {
streamers -= response
})
Future.successful("")
}
override def start(): Unit = {
super.start()
vertx.eventBus().consumer[SettlementEvent](classOf[SettlementEvent].toString, event => {
persistence.getNumberByEntityId(event.body().entityId).map { nr =>
streamers.foreach { stream =>
stream.write(
writeToString(SettlementEventResponse(nr,event.body.getClass.getSimpleName))
)
}
}
})
}
The start method has to be adapt to your needings. I listen on every event and fire to all listeners the json SettlementEventResponse. I tested it and it works. Also it is very fast. I hope that helps others.
Comments
Explore related questions
See similar questions with these tags.