I have a stream of IDs in Apache Flink. I would like to batch them into sets of 500 and for each batch call an external service that will give me additional data for each ID. Then I want to forward each ID with the additional data further downstream. I'm using batching here for performance reasons because 1 request with 500 IDs is much faster than 500 requests with 1 ID.
I tried implementing this using windows, but I'm either getting tiny batches or no batches at all. In runtime execution mode BATCH I'm also losing the last remaining IDs.
Ideally I would like to:
- Distribute the IDs across a configurable amount of workers, e.g. 10 on my local machine
- Each worker accumulates IDs until it has 500, then it makes a request and sends the additional data downstream
- When the upstream data source has finished, each worker should make one last request with what is left in the open batch
I'm a bit lost with the DataSet API, which functions should I use and how can I structure the program?
1 Answer 1
With the (recommended) DataStream API, and the goal of having a scalable, reliable workflow, one approach is the following:
- In a map function, convert your incoming record to a
Tuple2<key, record>. Thekeywould be an Integer hash calculated from one or more stable fields in the incoming record. By "stable" I mean they wouldn't change if you re-ran the workflow on the same data, thus it wouldn't be (say) a field where you put the ingest time. - Key the stream by the
Tuple2.f0(first field). - Implement a
KeyedProcessFunction. This would save incoming records inListState(and also register a timer set toMAX_WATERMARK). When you had 500 records, or the timer fired (which would happen when all of the incoming data had been received), then you'd output a record containing the batch of incoming records. - Follow that by a
RichAsyncFunction, where you call the remote service with the batch of records, and use the response to enrich (and then emit) the records.
DataSetAPI has been deprecated for a while, and with Flink 2.0 has been removed completely.