744 questions
- Bountied 0
- Unanswered
- Frequent
- Score
- Trending
- Week
- Month
- Unanswered (my tags)
0
votes
1
answer
56
views
How to emit keyed records for a compacted topic (SimpleStringSchema ClassCastException)?
I'm upgrading a PyFlink job to 2.0 and want to write to a Kafka compacted topic using the new KafkaSink. The stream produces (key, value) tuples (key is a string, value is a JSON payload). I configure ...
0
votes
0
answers
28
views
Running SQL in a test jar run in org.apache.flink.test.util.MiniClusterWithClientResource minicluster
I have an application that
Streams data from Kafka
Inserts the data received into Flink Table-Api
Perform Join on tables and emit event
StreamExecutionEnvironment and StreamTableEnvironment is used ...
0
votes
1
answer
53
views
High CPU usage from RowData serialization in Flink Table API despite ObjectReuse optimization
I have a Table API pipeline that does a 1-minute Tumbling Count aggregation over a set of 15 columns. FlameGraph shows that most of the CPU (~40%) goes into serializing each row, despite using ...
0
votes
1
answer
55
views
How to add a new column to flink sql job that can restore from an existing savepoint or checkpoint
I have 2 table, both use kafka connector to read data. I join these source and write data to a another kafka topic. We checkpoint every 10 minutes, so when job restart, we use execution.savepoint.path ...
0
votes
1
answer
72
views
FLink sql with mini batch seems to trigger only on checkpoint
I have the following config set for my job
'table.exec.sink.upsert-materialize': 'NONE',
'table.exec.mini-batch.enabled': true,
'table.exec.mini-batch.allow-latency'...
0
votes
1
answer
62
views
Flink DynamoDB Sink: "The provided key element does not match the schema"
I'm trying to write data from a Flink SQL job to a DynamoDB table, but I'm consistently getting the following error:
Caused by: software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The ...
0
votes
0
answers
77
views
Flink SQL Job: com.starrocks.data.load.stream.exception.StreamLoadFailException: Could not get load state because
I'm encountering a Flink job failure and would appreciate any input on what might be misconfigured:
2025‐07‐28 17:30:52
org.apache.flink.runtime.JobException: Recovery is suppressed by ...
0
votes
1
answer
34
views
How to Log State Access (get/put) in Flink SQL Join Operators with Operator Metadata?
I'm using Flink SQL to join data from multiple Kafka topics. Sometimes the resulting join is non-trivial to debug, and I want to log the state access involved in the join — specifically, I’d like to ...
2
votes
1
answer
63
views
Files stuck as .inprogress, not rolling into final Parquet files
I'm running a Flink streaming job using the Table API, which reads from Kafka and writes to S3 (for now I'm using a local path to simulate S3). The job uses a filesystem connector to write data in ...
0
votes
1
answer
72
views
Temporal join - no emitting results
I am struggling with a temporal join that is emitting 0 messages out.
Creating this for later joining for fetching some ids, here are my statements.
CREATE TABLE `id_A-id_B`
(
PRIMARY KEY (id_A) NOT ...
0
votes
0
answers
106
views
PyFlink Python UDF Fails in Remote Cluster from Jupyter Notebook – Connection Refused from Python Harness
I'm trying to develop a PyFlink streaming job using a Python UDF. When executing from Jupyter Notebook (local) with execution.target = remote, the job fails at the Python environment initialization ...
0
votes
1
answer
69
views
Unable to read Key from Kafka messages in Flink SQL Client
I have an Order Kafka topic with below messages:
Key: {"orderNumber":"1234"}
Value: {"orderDate":"20250528","productId":"Product123"}
I am ...
0
votes
1
answer
72
views
Flink SQL - windows aggregation
I'm doing a Flink SQL stream processing, trying to do some windowing aggregation ... but I'm suffering that the job stops emitting new aggregations after some time (or after he catch up with all the ...
1
vote
1
answer
77
views
How to perform an Interval Join in Flink SQL on upsert (non-append-only) tables?
I have 2 source tables in my streaming Flink application - sessions and orders. Each order belongs to a certain session. Both tables can be updated (not only appended to), but we know that sessions ...
0
votes
1
answer
51
views
Unnable to run Flink Table API locally in version 1.19.2
i am trying to set up a local flink job that uses Table API to fetch data from a Kafka Source and printing it. Below is a snippet of the Flink Job
public static void main(String[] args) {
// ...