0

I have a vertx server which sends on events to all streamers an information. Using a console all working as wished. But if I use a vertx webclient the client is connected but did not receive anything.

the web client

val webClientOpt = WebClientOptions()
 .setKeepAlive(true)
 .setUserAgent("Client/2.0")
 .setFollowRedirects(true)
 .setShared(true)

the client call

client
 .get(port, host, UriTemplate.of(s"${path}event-bus/"))
 .putHeader("content-type", "application/json")
 .bearerTokenAuthentication(UserBuffer.loggedInUser().jwtToken())
 .as(BodyCodec.pipe(writeBuffer)).send()

now the writeBuffer and reader

val writeBuffer = ReactiveWriteStream.writeStream[Buffer](vertx)
val readStream = ReactiveReadStream.readStream[Buffer]()

Also the stuff to get the data

readStream.handler(j => {
 println("CONSUMING!!!!")
 println(j.toString("UTF-8"))
 })
 writeBuffer.subscribe(readStream)

I know I have to use the BodyCodec.pipe but I think here is my problem. I think I don't use it correctly.


It was not clearly to me how to use the WriteStream correctly. Now I use

new WriteStream[Buffer]() {
 override def write(buffer: Buffer): io.vertx.core.Future[Void] = {
 println(buffer.toString())
 Future.successful(null).asVertx
 }
 override def end(): io.vertx.core.Future[Void] = {
 Future.successful(null).asVertx
 }
 override def setWriteQueueMaxSize(maxSize: Int): WriteStream[Buffer] = this
 override def writeQueueFull(): Boolean = false
 override def drainHandler(handler: Handler[Void]): WriteStream[Buffer] = this
 override def exceptionHandler(handler: Handler[Throwable]): WriteStream[Buffer] = this
 }

That is what I was looking for.


5
  • Please clarify your specific problem or provide additional details to highlight exactly what you need. As it's currently written, it's hard to tell exactly what you're asking. Commented Sep 10, 2025 at 22:06
  • For clarity, is this a one-shot stream (HTTP chunked) or Server Side Events or Websockets? Commented Sep 11, 2025 at 5:25
  • I have not understand the WriteBuffer. But now I found a way. Commented Sep 11, 2025 at 12:28
  • @AndréSchmidt If you found a solution, can you please answer your own question to the benefit of future readers? :) Commented Sep 15, 2025 at 12:52
  • @stefanobaghino yes i found a solution. I will add them Commented Nov 12, 2025 at 20:56

1 Answer 1

0

Here is a solution:

 private lazy val templatePipe = new WriteStream[Buffer]() {
 override def write(buffer: Buffer): io.vertx.core.Future[Void] = {
 val body = readFromString[TemplateEventResponse](buffer.toString())
 getTemplate(body.uniqueName)
 .map(_ => null).asVertx
 }
 override def end(): io.vertx.core.Future[Void] = {
 Future.successful(null).asVertx
 }
 override def setWriteQueueMaxSize(maxSize: Int): WriteStream[Buffer] = this
 override def writeQueueFull(): Boolean = false
 override def drainHandler(handler: Handler[Void]): WriteStream[Buffer] = this
 override def exceptionHandler(handler: Handler[Throwable]): WriteStream[Buffer] = this
 }

Above, you see the handler and now the way to register from a client to an server.

 private def updateStream(): Unit = {
 client
 .get(port, host, UriTemplate.of(s"${path}event-register/"))
 .putHeader("content-type", "application/json")
 .bearerTokenAuthentication(UserBuffer.loggedInUser().jwtToken())
 .as(BodyCodec.pipe(templatePipe))
 .send().asScala
 .onComplete {
 case Success(data) if data.statusCode() >= 200 && data.statusCode() < 300 =>
 case Success(value) =>
 //TODO meldung machen
 case Failure(err) =>
 //TODO meldung machen
 }
 }

If you call updateStream() this client register on a server and waits. On each event the handler will start the work. For the server I use this:

 private val streamers = mutable.HashSet[HttpServerResponse]()
 def registerUpdateEvent(ctx: RoutingContext): Future[?] = {
 val response = ctx.response()
 .putHeader("Content-Type", "text/json")
 .setChunked(true)
 streamers += response
 response.endHandler(_ => {
 streamers -= response
 })
 response.exceptionHandler(_ => {
 streamers -= response
 })
 Future.successful("")
 }
 override def start(): Unit = {
 super.start()
 vertx.eventBus().consumer[SettlementEvent](classOf[SettlementEvent].toString, event => {
 persistence.getNumberByEntityId(event.body().entityId).map { nr =>
 streamers.foreach { stream =>
 stream.write(
 writeToString(SettlementEventResponse(nr,event.body.getClass.getSimpleName))
 )
 }
 }
 })
 }

The start method has to be adapt to your needings. I listen on every event and fire to all listeners the json SettlementEventResponse. I tested it and it works. Also it is very fast. I hope that helps others.

DaveL17
2,08112 gold badges30 silver badges48 bronze badges
answered Nov 12, 2025 at 21:05
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.