@@ -997,7 +997,7 @@ public void testDoNotIncludeKey(boolean useHttp) {
997997 }
998998
999999 @ Test
1000- public void testExtractKafkaIngestionTimestampAsField () {
1000+ public void testExtractKafkaIngestionTimestampAsField_designated () {
10011001 connect .kafka ().createTopic (topicName , 1 );
10021002 Map <String , String > props = ConnectTestUtils .baseConnectorProps (questDBContainer , topicName , true );
10031003 props .put (QuestDBSinkConnectorConfig .DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG , "birth" ); // the field is injected via InsertField SMT
@@ -1035,6 +1035,48 @@ public void testExtractKafkaIngestionTimestampAsField() {
10351035 httpPort );
10361036 }
10371037
1038+ @ Test
1039+ public void testExtractKafkaIngestionTimestampAsField_nondesignated_schemaless () {
1040+ connect .kafka ().createTopic (topicName , 1 );
1041+ Map <String , String > props = ConnectTestUtils .baseConnectorProps (questDBContainer , topicName , true );
1042+ props .put (QuestDBSinkConnectorConfig .INCLUDE_KEY_CONFIG , "false" );
1043+ props .put ("value.converter.schemas.enable" , "false" );
1044+ props .put ("transforms" , "InsertField,TimestampConverter" );
1045+ props .put ("transforms.InsertField.type" , "org.apache.kafka.connect.transforms.InsertField$Value" );
1046+ props .put ("transforms.InsertField.timestamp.field" , "birth" );
1047+ props .put ("transforms.TimestampConverter.type" , "org.apache.kafka.connect.transforms.TimestampConverter$Value" );
1048+ props .put ("transforms.TimestampConverter.field" , "birth" );
1049+ props .put ("transforms.TimestampConverter.target.type" , "Timestamp" );
1050+ connect .configureConnector (ConnectTestUtils .CONNECTOR_NAME , props );
1051+ ConnectTestUtils .assertConnectorTaskRunningEventually (connect );
1052+ 1053+ QuestDBUtils .assertSql (
1054+ "{\" ddl\" :\" OK\" }" ,
1055+ "create table " + topicName + " (firstname string, lastname string, birth timestamp, ts timestamp) timestamp(ts) partition by day wal" ,
1056+ httpPort ,
1057+ QuestDBUtils .Endpoint .EXEC );
1058+ 1059+ // note: there is no birth field in the message payload
1060+ String personJson = "{\" firstname\" :\" John\" ,\" lastname\" :\" Doe\" }" ;
1061+ 1062+ Map <String , Object > prodProps = new HashMap <>();
1063+ try (KafkaProducer <byte [], byte []> producer = connect .kafka ().createProducer (prodProps )) {
1064+ java .util .Date birth = new Calendar .Builder ()
1065+ .setTimeZone (TimeZone .getTimeZone ("UTC" ))
1066+ .setDate (2022 , 9 , 23 ) // note: month is 0-based
1067+ .setTimeOfDay (13 , 53 , 59 , 123 )
1068+ .build ().getTime ();
1069+ long kafkaTimestamp = birth .getTime ();
1070+ ProducerRecord <byte [], byte []> producerRecord = new ProducerRecord <>(topicName , null , kafkaTimestamp , "key" .getBytes (), personJson .getBytes ());
1071+ producer .send (producerRecord );
1072+ }
1073+ 1074+ QuestDBUtils .assertSqlEventually ("\" firstname\" ,\" lastname\" ,\" birth\" \r \n "
1075+ + "\" John\" ,\" Doe\" ,\" 2022年10月23日T13:53:59.123000Z\" \r \n " ,
1076+ "select firstname, lastname, birth from " + topicName ,
1077+ httpPort );
1078+ }
1079+ 10381080
10391081 @ ParameterizedTest
10401082 @ ValueSource (booleans = {true , false })
0 commit comments