0

I'm using a local Flink 1.6 cluster configured to use the flink-table jar (meaning my program's jar does not include flink-table). With the following code

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;
public class JMain {
 public static void main(String[] args) throws Exception {
 ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);
 tableEnv.registerFunction("enlist", new Enlister());
 DataSource<Tuple2<String, String>> source = execEnv.fromElements(
 new Tuple2<>("a", "1"),
 new Tuple2<>("a", "2"),
 new Tuple2<>("b", "3")
 );
 Table table = tableEnv.fromDataSet(source, "a, b")
 .groupBy("a")
 .select("enlist(a, b)");
 tableEnv.toDataSet(table, Row.class)
 .print();
 }
 public static class Enlister
 extends AggregateFunction<List<String>, ArrayList<String>>
 implements ResultTypeQueryable<List<String>>
 {
 @Override
 public ArrayList<String> createAccumulator() {
 return new ArrayList<>();
 }
 @Override
 public List<String> getValue(ArrayList<String> acc) {
 return acc;
 }
 @SuppressWarnings("unused")
 public void accumulate(ArrayList<String> acc, String a, String b) {
 acc.add(a + ":" + b);
 }
 @SuppressWarnings("unused")
 public void merge(ArrayList<String> acc, Iterable<ArrayList<String>> it) {
 for (ArrayList<String> otherAcc : it) {
 acc.addAll(otherAcc);
 }
 }
 @SuppressWarnings("unused")
 public void resetAccumulator(ArrayList<String> acc) {
 acc.clear();
 }
 @Override
 public TypeInformation<List<String>> getProducedType() {
 return TypeInformation.of(new TypeHint<List<String>>(){});
 }
 }
}

I get this weird exception:

org.apache.flink.table.api.ValidationException: Expression Enlister(List('a, 'b)) failed on input check: Given parameters do not match any signature. 
Actual: (java.lang.String, java.lang.String) 
Expected: (java.lang.String, java.lang.String)

However, if I do not implement ResultTypeQueryable, I get the expected output:

Starting execution of program
[b:3]
[a:1, a:2]
Program execution finished
Job with JobID 20497bd3efe44fab0092a05a8eb7d9de has finished.
Job Runtime: 270 ms
Accumulator Results: 
- 56e0e5a9466b84ae44431c9c4b7aad71 (java.util.ArrayList) [2 elements]

My actual use case seems to require ResultTypeQueryable, because otherwise I get this exception:

The return type of function ... could not be determined automatically,
due to type erasure. You can give type information hints by using the
returns(...) method on the result of the transformation call,
or by letting your function implement the 'ResultTypeQueryable' interface

Any way I can fix this?

asked Sep 13, 2018 at 20:30
2
  • Hi, I think you might have stumbled upon a bug. Do you though need to extend the ResultTypeQueryable ? If so, could you share why? Isn't the getResultType of AggregateFunction enough for you? Commented Sep 14, 2018 at 8:10
  • @DawidWysakowicz If I override getResultType only, I get: The return type of function ... could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. Commented Sep 14, 2018 at 8:50

2 Answers 2

2

Implementing ResultTypeQueryable is not correct in this case. The exception is misleading. Instead override getResultType() and getAccumulatorType(). The reason behind this is that generics usually cause problems (due to Java's type erasure) when generating the type information for serializers.

answered Sep 14, 2018 at 12:28
Sign up to request clarification or add additional context in comments.

Comments

0

I tried to reproduce the problem in a small program but I couldn't, it only happens in my larger project. Unfortunately, overriding getResultType() and getAccumulatorType() didn't help either, I got this exception in that case:

java.lang.IndexOutOfBoundsException
 at org.apache.flink.api.java.typeutils.TupleTypeInfoBase.getTypeAt(TupleTypeInfoBase.java:199)
 at org.apache.flink.api.java.typeutils.RowTypeInfo.getTypeAt(RowTypeInfo.java:179)
 at org.apache.flink.api.common.operators.Keys$ExpressionKeys.isSortKey(Keys.java:444)
 at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:150)
 at org.apache.flink.api.java.operators.SortPartitionOperator.<init>(SortPartitionOperator.java:75)
 at org.apache.flink.api.java.DataSet.sortPartition(DataSet.java:1414)

I actually also got that exception even without overriding. The only thing that worked for me was essentially:

String[] fieldNames = new String[] {
 "result"
};
TypeInformation<?>[] types = new TypeInformation[] {
 TypeInformation.of(new TypeHint<List<String>>(){})
};
tableEnv.toDataSet(table, Types.ROW(fieldNames, types))...
answered Sep 17, 2018 at 8:10

2 Comments

@twalthr In case this is relevant to you.
Usually, tableEnv.toDataSet(table, Row.class) should be sufficient. If not, this is a bug and should be reported with a little reproducible program.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.