My flink application does the following
- source: read data in form of records from Kafka
- split: based on certain criteria
- window : timewindow of 10seconds to aggregate into one bulkrecord
- sink: dump these bulkrecords to elasticsearch
I am facing issue where flink consumer is not able to hold data for 10seconds, and is throwing the following exception:
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=18340663 , maxSize=5242880
I cannot apply countWindow, because if the frequency of records is too slow, then the elasticsearch sink might be deferred for a long time.
My question:
Is it possible to apply a OR function of TimeWindow and CountWindow, which goes as
> if ( recordCount is 500 OR 10 seconds have elapsed)
> then dump data to flink
-
I guess a GlobalWindow and a custom Trigger (ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/…) should do the job.TobiSH– TobiSH2019年01月30日 12:50:50 +00:00Commented Jan 30, 2019 at 12:50
2 Answers 2
Not directly. But you can use a GlobalWindow with a custom triggering logic. Take a look at the source for the count trigger here.
Your triggering logic will look something like this.
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private long triggerTimestamp = 0;
@Override
public TriggerResult onElement(String element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);
// Increment window counter by one, when an element is received
count.add(1L);
// Start the timer when the first packet is received
if (count.get() == 1) {
triggerTimestamp = triggerContext.getCurrentProcessingTime() + 10000; // trigger at 10 seconds from reception of first event
triggerContext.registerProcessingTimeTimer(triggerTimestamp); // Override the onProcessingTime method to trigger the window at this time
}
// Or trigger the window when the number of packets in the window reaches 500
if (count.get() >= 500) {
// Delete the timer, clear the count and fire the window
triggerContext.deleteProcessingTimeTimer(triggerTimestamp);
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
Comments
You could also use the RocksDB state backend, but a custom Trigger will perform better.
Comments
Explore related questions
See similar questions with these tags.