-
Notifications
You must be signed in to change notification settings - Fork 618
[GLUTEN][FLINK] Nexmark q0 performance #11508
-
Hello community, according flink docs I builded velox4j myself with gluten and run local cluster to test nexmark q0 query. With the latest branch of nexmark datagen not working for me (pure virtual call issue in logs) then I try to use kafka connector with local kafka. And in this case q0 pass after 49min - I noticed that conversion between RowData and RowVector is heavy operation so I removed code in CommonExecSink of StreamRecordTimestampInserter to avoid conversion. So after run the job was (NativeKafka -> gluten_calc -> Writer) as one chained operator - result is 8min. Without gluten same config the job pass after 4min. Why the performance is so terrible?
@shuai-xu @KevinyhZou What do you think?
flink config.yaml
taskmanager.memory.process.size: 4G
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4G
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
io.tmp.dirs: /tmp/flink/tmp/
jobmanager.bind-host: 0.0.0.0
rest.bind-host: 0.0.0.0
#==============================================================================
# JVM
#==============================================================================
# JVM options for GC
env.java.opts: -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:NewRatio=3 -XX:ConcGCThreads=4 --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
env.java.opts.jobmanager: -Xloggc:$FLINK_LOG_DIR/jobmanager-gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
env.java.opts.taskmanager: -Xloggc:$FLINK_LOG_DIR/taskmanager-gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
#==============================================================================
# State & Checkpoint
#==============================================================================
state.backend.type: rocksdb
# for example, hdfs://benchmark01/checkpoint
execution.checkpointing.dir: file:///tmp/flink/checkpoint
execution.checkpointing.incremental: true
execution.checkpointing.interval: 18000000
execution.checkpointing.mode: EXACTLY_ONCE
state.backend.local-recovery: true
#==============================================================================
# Runtime Others
#==============================================================================
# configuration options for adjusting and tuning table programs.
#table.exec.mini-batch.enabled: true
#table.exec.mini-batch.allow-latency: 2s
#table.exec.mini-batch.size: 50000
table.optimizer.distinct-agg.split.enabled: true
# disable final checkpoint to avoid test waiting for the last checkpoint complete
execution.checkpointing.checkpoints-after-tasks-finish.enabled: false
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
jobstore.expiration-time: 1000000
execution.checkpointing.tolerable-failed-checkpoints: 10
Beta Was this translation helpful? Give feedback.
All reactions
-
👀 1
Replies: 2 comments 1 reply
-
I have reproduced this on my local environment, and it seems the pr: #11365 leads to the performance regression. @lgbo-ustc could you take a look at this?
Beta Was this translation helpful? Give feedback.
All reactions
-
@ParyshevSergey Can you provide more details about the test settings. It is expected that the new version of the code may experience a certain degree of performance regression, with a potential decrease of around 10%. Currently, our priority is the improvement of functionalities, and performance optimization in this aspect will be considered later. However, as you mentioned, we have not encountered such significant slowdowns before. We have re-run the stress tests, and the results are as follows (without StreamRecordTimestampInserter which is not supported at present).
datagen source
gluten
-------------------------------- Nexmark Results --------------------------------
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
| Query| Events Num | Cores | Time(s) | Cores * Time(s) | Throughput | Throughput/Cores|
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
|q0 |100,000,000 |1.01 |40.124 |40.460 |2.49 M/s |2.47 M/s |
|Total |100,000,000 |1.008 |40.124 |40.460 |2.49 M/s |2.47 M/s |
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
flink
-------------------------------- Nexmark Results --------------------------------
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
| Query| Events Num | Cores | Time(s) | Cores * Time(s) | Throughput | Throughput/Cores|
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
|q0 |100,000,000 |1.03 |115.690 |119.576 |864.38 K/s |836.29 K/s |
|Total |100,000,000 |1.034 |115.690 |119.576 |864.38 K/s |836.29 K/s |
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
kafka source
gluten
企业微信截图_11538e53-0693-4a0c-b423-62ec7c41d044flink
企业微信截图_6a69335c-573b-4ed6-b7a3-ade33c3a081fBeta Was this translation helpful? Give feedback.
All reactions
-
@lgbo-ustc Flink config above, results:
Gluten w/ Kafka, 3 of 4 native operators (StreamRecordTimestampInserter is java operator)
-------------------------------- Nexmark Results --------------------------------
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
| Query| Events Num | Cores | Time(s) | Cores * Time(s) | Throughput | Throughput/Cores|
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
|q0 |100,000,000 |0 |2995.692 |0.000 |33.38 K/s |9.22 E/s | -- metric monitor is off
|q0 |100,000,000 |1 |2985.706 |2983.679 |33.49 K/s |33.52 K/s |
Gluten w/ Kafka, 3 of 3 native operators (StreamRecordTimestampInserter is deleted in code)
-------------------------------- Nexmark Results --------------------------------
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
| Query| Events Num | Cores | Time(s) | Cores * Time(s) | Throughput | Throughput/Cores|
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
|q0 |100,000,000 |0.36 |525.402 |187.356 |190.33 K/s |533.74 K/s |
|Total |100,000,000 |0.357 |525.402 |187.356 |190.33 K/s |533.74 K/s |
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
2026年01月28日 16:42:33,100 INFO org.apache.gluten.client.OffloadedJobGraphGenerator [] - OperatorChainSliceGraph:
Slice ID: 7, offloadable: false
Inputs: []
Outputs: []
Operator Configs: nexmark_q0[3]: Writer(7)
2026年01月28日 16:42:33,108 INFO org.apache.gluten.client.OffloadedJobGraphGenerator [] - OperatorChainSliceGraph:
Slice ID: 2, offloadable: true
Inputs: []
Outputs: [3]
Operator Configs: Source: kafka1
Slice ID: 3, offloadable: true
Inputs: [2]
Outputs: [5]
Operator Configs: gluten-calc(3)
Slice ID: 5, offloadable: true
Inputs: [3]
Outputs: []
Operator Configs: Writer(5)
Flink 1.20
-------------------------------- Nexmark Results --------------------------------
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
| Query| Events Num | Cores | Time(s) | Cores * Time(s) | Throughput | Throughput/Cores|
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
|q0 |100,000,000 |0.88 |275.188 |242.630 |363.39 K/s |412.15 K/s |
|Total |100,000,000 |0.882 |275.188 |242.630 |363.39 K/s |412.15 K/s |
+------+-----------------+--------+----------+-----------------+--------------+-----------------+
Beta Was this translation helpful? Give feedback.