0

A continuation to this : Flink : Handling Keyed Streams with data older than application watermark

based on the suggestion, I have been trying to add support for Batch in the same Flink application which was using the Datastream API's.

The logic is something like this :

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.readTextFile("fileName")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
 .<DetectionEvent>forBoundedOutOfOrderness(orderness)
 .withTimestampAssigner(
 (SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);

Based on the public docs, my understanding was that i simply needed to change the source to a bounded one. However the above processing keeps on failing at the event trigger after the windowing step with the below exception :

java.lang.IllegalStateException: Checkpointing is not allowed with sorted inputs.
 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:99)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:552)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
 at java.base/java.lang.Thread.run(Thread.java:829)

The input file contains the historical events for multiple keys. The data for a given key is sorted, but the overall data is not. I have also added an event at the end of each key with the timestamp = MAX_WATERMARK to indicate end of keyed Stream. I tried it for a single key as well but the processing failed with the same exception.

Note: I have not enabled checkpointing. I have also tried explicitly disabling checkpointing to no avail.

env.getCheckpointConfig().disableCheckpointing();

EDIT - 1

Adding more details : I tried changing and using FileSource to read files but still getting the same exception.

environment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")

The first process step and key splitting works. However it fails after that. I tried removing windowing and adding a simple process step but it continues to fail. There is no explicit Sink. The last process function simply updates a database.

process exception timeline Attaching images in case they help.

Is there something I'm missing ?

asked Nov 27, 2021 at 19:01
2
  • There's no need for you to sort the input, or add special watermarks. These are things Flink will do itself when operating in batch mode. Not sure what's wrong; it would help to see the entire job. The problem might be the connectors -- batch mode works best with connectors using the new source and sink interfaces. Commented Nov 27, 2021 at 21:05
  • @DavidAnderson, updated the description of original post with additional details. Commented Nov 28, 2021 at 15:29

1 Answer 1

0

That exception can only be thrown if checkpointing is enabled. Perhaps you can a checkpointing interval configured in flink-conf.yaml?

answered Nov 28, 2021 at 21:13
Sign up to request clarification or add additional context in comments.

2 Comments

Turns out that kinesis analytics has check pointing enabled by default (I'm using kinesis analytics to deploy and manage the flink application). Turning it off seems to have done the trick although its leading to a new set of problems.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.