0

I'm trying to use Flink to consume a bounded data from a message queue in a streaming passion. The data will be in the following format:

{"id":-1,"name":"Start"}
{"id":1,"name":"Foo 1"}
{"id":2,"name":"Foo 2"}
{"id":3,"name":"Foo 3"}
{"id":4,"name":"Foo 4"}
{"id":5,"name":"Foo 5"}
...
{"id":-2,"name":"End"}

The start and end of messages can be determined using the event id. I want to receive such batches and store the latest (by overwriting) batch on disk or in memory. I can write a custom window trigger to extract the events using the start and end flags as shown below:

DataStream<Foo> fooDataStream = ...
AllWindowedStream<Foo, GlobalWindow> fooWindow = fooDataStream.windowAll(GlobalWindows.create())
.trigger(new CustomTrigger<>())
.evictor(new Evictor<Foo, GlobalWindow>() {
 @Override
 public void evictBefore(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
 for (Iterator<TimestampedValue<Foo>> iterator = elements.iterator();
 iterator.hasNext(); ) {
 TimestampedValue<Foo> foo = iterator.next();
 if (foo.getValue().getId() < 0) {
 iterator.remove();
 }
 }
 }
 @Override
 public void evictAfter(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
 }
});

but how can I persist the output of the latest window. One way would be using a ProcessAllWindowFunction to receive all the events and write them to disk manually but it feels like a hack. I'm also looking into the Table API with Flink CEP Pattern (like this question) but couldn't find a way to clear the Table after each batch to discard the events from the previous batch.

asked Nov 3, 2021 at 21:05

1 Answer 1

1

There are a couple of things getting in the way of what you want:

(1) Flink's window operators produce append streams, rather than update streams. They're not designed to update previously emitted results. CEP also doesn't produce update streams.

(2) Flink's file system abstraction does not support overwriting files. This is because object stores, like S3, don't support this operation very well.

I think your options are:

(1) Rework your job so that it produces an update (changelog) stream. You can do this with toChangelogStream, or by using Table/SQL operations that create update streams, such as GROUP BY (when it's used without a time window). On top of this, you'll need to choose a sink that supports retractions/updates, such as a database.

(2) Stick to producing an append stream and use something like the FileSink to write the results to a series of rolling files. Then do some scripting outside of Flink to get what you want out of this.

answered Nov 4, 2021 at 8:56
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks. Do you see any issues with using the ProcessAllWindowFunction to write the data manually other than the performance bottlenecks in combining all events in one operator?
Fault tolerance and recovery -- this is where you'll have problems with this approach. Flink is able to provide exactly once guarantees because its sinks participate in checkpointing in a carefully designed manner. You'll be giving this up.
Thanks. That makes sense.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.