We recently experimented with Flink, in particular BATCH execution mode to setup an ETL job processing an bounded data-set. It works quite well but I'd like to get some clarifications about my understanding of Flink, as well as any nuances that I may have missed as an beginner.
Our setup
Our logical job graph looks something like this:
FileSource -> map() -> AssignTimestamps() -> filter() -> keyBy -> TumblingWindow -> FileSink (Attached below what the Flink Dashboard shows). We are running Flink 1.20 in AWS KDA, using the DataStream API.
My cursory understanding of Flink under the hood
We overrode the FileEnumerator on the FileSource, so that it'd create FileSourceSplits from an pre-computed manifest file. The JobManager (an singleton inside an KDA cluster), on startup calls our custom FileEnumereator on the FileSource to create these FileSourceSplits.
These objects are then inserted into an queue, to which the JobManager will hand them out, FIFO style via the default FileAssigner to requests, coming from SourceReaders on each TaskManager process.
The SourceReaders then take the file path (metadata) and do their own fetching to get the actual content. (Cross referenced with https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/).
The blue boxes, IIUC, are Tasks, which looks to be an chain of logical operators? Then there's batch mode which I don't quite get.
On the docs it's said that it does not use checkpointing, back-pressure, nor does Flink need RocksDB in BATCH mode; and that "keys are sorted and processed sequentially", also tasks wait until upstreams have materialized intermediate results.
My questions are:
- Where are the intermediate results stored? On disk or else where?
- I noticed that the results (after the first Task completes), doesn't actually get transmitted in one go into the second operator. Instead, they seem to be "streamed" in. I vaguely recall reading about how Flink batch is technically streaming on an bounded datas-set. Is this that?
- I am having trouble conceptualizing how/why RocksDB in general isn't needed for batch processing, and would like to get some pointers/clarification on why this is the case.
Thank you so much for any insights or corrections! If I’m misunderstanding anything, or if there are important nuances I’m missing, I’d really appreciate your feedback or pointers to more detailed resources.
1 Answer 1
- Intermediate results are kept in memory, and will spill to disk if necessary.
- Batch is treated as a special case of streaming, optimized to take advantage of the bounded inputs. So, yes.
- The amount of state Flink needs is determined by the number of distinct keys being handled concurrently. In streaming mode, there's no way to constrain this -- all keys have to be handled all the time -- so in streaming mode Flink needs a state store like RocksDB. By sorting the input by key, in batch mode Flink only has to handle one key at a time, so it only needs a teeny bit of state.
Batch mode avoids the things that make Flink complicated: managed state, checkpointing, and watermarks.
Comments
Explore related questions
See similar questions with these tags.