2

My flink application does the following

  1. source: read data in form of records from Kafka
  2. split: based on certain criteria
  3. window : timewindow of 10seconds to aggregate into one bulkrecord
  4. sink: dump these bulkrecords to elasticsearch

I am facing issue where flink consumer is not able to hold data for 10seconds, and is throwing the following exception:

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=18340663 , maxSize=5242880

I cannot apply countWindow, because if the frequency of records is too slow, then the elasticsearch sink might be deferred for a long time.

My question:

Is it possible to apply a OR function of TimeWindow and CountWindow, which goes as

> if ( recordCount is 500 OR 10 seconds have elapsed)
> then dump data to flink
asked Jan 30, 2019 at 12:11
1

2 Answers 2

1

Not directly. But you can use a GlobalWindow with a custom triggering logic. Take a look at the source for the count trigger here.

Your triggering logic will look something like this.

private final ReducingStateDescriptor<Long> stateDesc = 
 new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private long triggerTimestamp = 0;
@Override
public TriggerResult onElement(String element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
 ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);
 // Increment window counter by one, when an element is received
 count.add(1L); 
 // Start the timer when the first packet is received
 if (count.get() == 1) {
 triggerTimestamp = triggerContext.getCurrentProcessingTime() + 10000; // trigger at 10 seconds from reception of first event
 triggerContext.registerProcessingTimeTimer(triggerTimestamp); // Override the onProcessingTime method to trigger the window at this time
 }
 // Or trigger the window when the number of packets in the window reaches 500
 if (count.get() >= 500) {
 // Delete the timer, clear the count and fire the window 
 triggerContext.deleteProcessingTimeTimer(triggerTimestamp);
 count.clear();
 return TriggerResult.FIRE;
 }
 return TriggerResult.CONTINUE;
}
answered Jan 30, 2019 at 13:00
Sign up to request clarification or add additional context in comments.

Comments

0

You could also use the RocksDB state backend, but a custom Trigger will perform better.

answered Jan 30, 2019 at 15:18

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.