0

I have a stream of IDs in Apache Flink. I would like to batch them into sets of 500 and for each batch call an external service that will give me additional data for each ID. Then I want to forward each ID with the additional data further downstream. I'm using batching here for performance reasons because 1 request with 500 IDs is much faster than 500 requests with 1 ID.

I tried implementing this using windows, but I'm either getting tiny batches or no batches at all. In runtime execution mode BATCH I'm also losing the last remaining IDs.

Ideally I would like to:

  • Distribute the IDs across a configurable amount of workers, e.g. 10 on my local machine
  • Each worker accumulates IDs until it has 500, then it makes a request and sends the additional data downstream
  • When the upstream data source has finished, each worker should make one last request with what is left in the open batch

I'm a bit lost with the DataSet API, which functions should I use and how can I structure the program?

asked Feb 26, 2025 at 11:26
1
  • What version of Flink are you using? The DataSet API has been deprecated for a while, and with Flink 2.0 has been removed completely. Commented Feb 27, 2025 at 21:32

1 Answer 1

1

With the (recommended) DataStream API, and the goal of having a scalable, reliable workflow, one approach is the following:

  1. In a map function, convert your incoming record to a Tuple2<key, record>. The key would be an Integer hash calculated from one or more stable fields in the incoming record. By "stable" I mean they wouldn't change if you re-ran the workflow on the same data, thus it wouldn't be (say) a field where you put the ingest time.
  2. Key the stream by the Tuple2.f0 (first field).
  3. Implement a KeyedProcessFunction. This would save incoming records in ListState (and also register a timer set to MAX_WATERMARK). When you had 500 records, or the timer fired (which would happen when all of the incoming data had been received), then you'd output a record containing the batch of incoming records.
  4. Follow that by a RichAsyncFunction, where you call the remote service with the batch of records, and use the response to enrich (and then emit) the records.
answered Feb 28, 2025 at 22:19
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.