0

I am trying a simple task of reading a CSV file inside flink execution class using CsvReaderFormat as mentioned in the documentation. I have a pojo called subscriberADSR (I know it is a bad practise to start with small letters, but will correct later)

When followed the simple instruction to create a CSVReaderFormat

CsvReaderFormat<subscriberADSR> csvFormat = CsvReaderFormat<subscriberADSR>.

I get nothing after the . on the code assist. If I use the following I get the below compile error

CsvReaderFormat<subscriberADSR> csvFormat = CsvReaderFormat.forPojo(subscriberADSR.class);
The type org.apache.flink.connector.file.src.reader.SimpleStreamFormat cannot be resolved. It is indirectly referenced from required type org.apache.flink.formats.csv.CsvReaderFormat

The following are the pom dependencies for flink and flink csv

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.19.1</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-csv</artifactId>
 <version>1.19.1</version>
</dependency>

Please help me understand what I am doing wrong

I have gone through most of forum questions if anyone reported similar issues, but could not find anything

asked Jun 30, 2024 at 17:10
1
  • Maybe if someone could share a working sample java code for flink reading data from a csv file into a pojo would be great. Not able to make this simple thing work as yet by reading the documentation Commented Jul 1, 2024 at 1:50

1 Answer 1

1

Did you try this example?

//Has to match the exact order of columns in the CSV file
@JsonPropertyOrder({"city","lat","lng","country","iso2",
 "adminName","capital","population"})
 public static class CityPojo {
 public String city;
 public BigDecimal lat;
 public BigDecimal lng;
 public String country;
 public String iso2;
 public String adminName;
 public String capital;
 public long population;
}
Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
 mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
CsvReaderFormat<CityPojo> csvFormat =
 CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));
FileSource<CityPojo> source =
 FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();

https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/formats/csv/#advanced-configuration

answered Jul 1, 2024 at 18:28
Sign up to request clarification or add additional context in comments.

1 Comment

Jie, Thanks for taking time to suggest. Though this did not solve my problem right away, but at least made me explore. The major challenge I am facing with flink that there are many things that have been deprecated recently with no proper guidance on how to use the latest for someone who is starting up. Most of the issues were pertaining to the use of wrong dependencies and wrong versions numbers. It was very painful to solve but finally was able to read the CSV file. But thank you anyway

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.