1

I'm using Apache Flink for stream processing.

After subscribing the messages from source(ex:Kafka, AWS Kinesis Data Streams) and then applying transformation, aggregation and etc. using Flink operators on streaming data I want to buffer final messages(ex:1000 in count) and post each batch in a single request to external REST API.

How to implement buffering mechanism(creating each 1000 records as a batch) in Apache Flink?

Flink pipileine: streaming Source --> transform/reduce using Operators --> buffer 1000 messages --> post to REST API

Appreciate your help!

asked Nov 7, 2019 at 5:00

1 Answer 1

2

I'd create a sink with state that would hold on to the messages that are passed in. When the count gets high enough (1000) the sink sends the batch. The state can be in memory (e.g. an instance variable holding an ArrayList of messages), but you should use checkpoints so that you can recover that state in case of a failure of some kind.

When your sink has checkpointed state, it needs to implement CheckpointedFunction (in org.apache.flink.streaming.api.checkpoint) which means you need to add two methods to your sink:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
 checkpointedState.clear();
 // HttpSinkStateItem is a user-written class 
 // that just holds a collection of messages (Strings, in this case)
 //
 // Buffer is declared as ArrayList<String>
 checkpointedState.add(new HttpSinkStateItem(buffer));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
 // Mix and match different kinds of states as needed:
 // - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
 // - types are list and union 
 // - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
 // - types are value, list, reducing, aggregating and map
 // - Distinguish between state data using state name (e.g. "HttpSink-State") 
 ListStateDescriptor<HttpSinkStateItem> descriptor =
 new ListStateDescriptor<>(
 "HttpSink-State",
 HttpSinkStateItem.class);
 checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 if (context.isRestored()) {
 for (HttpSinkStateItem item: checkpointedState.get()) {
 buffer = new ArrayList<>(item.getPending()); 
 }
 } 
}

You can also use a timer in the sink (if the input stream is keyed/partitioned) to send periodically if the count doesn't reach your threshold.

answered Nov 7, 2019 at 7:04
Sign up to request clarification or add additional context in comments.

3 Comments

Chris, are you referring "Keyed State" as state, for creating sinc with and hold on to messages? If you have a reference example please link, I’m new to Flink, but anyway I will read docs. Thank you!
@Sparkle8 I'll see what I can come up with
thank you, Chris! your answer is helpful for implementing session based stream processing and post on messages to REST, with apache-flink

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.