I am trying a simple task of reading a CSV file inside flink execution class using CsvReaderFormat as mentioned in the documentation. I have a pojo called subscriberADSR (I know it is a bad practise to start with small letters, but will correct later)
When followed the simple instruction to create a CSVReaderFormat
CsvReaderFormat<subscriberADSR> csvFormat = CsvReaderFormat<subscriberADSR>.
I get nothing after the . on the code assist. If I use the following I get the below compile error
CsvReaderFormat<subscriberADSR> csvFormat = CsvReaderFormat.forPojo(subscriberADSR.class);
The type org.apache.flink.connector.file.src.reader.SimpleStreamFormat cannot be resolved. It is indirectly referenced from required type org.apache.flink.formats.csv.CsvReaderFormat
The following are the pom dependencies for flink and flink csv
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.19.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.19.1</version>
</dependency>
Please help me understand what I am doing wrong
I have gone through most of forum questions if anyone reported similar issues, but could not find anything
-
Maybe if someone could share a working sample java code for flink reading data from a csv file into a pojo would be great. Not able to make this simple thing work as yet by reading the documentationearthdomain– earthdomain2024年07月01日 01:50:38 +00:00Commented Jul 1, 2024 at 1:50
1 Answer 1
Did you try this example?
//Has to match the exact order of columns in the CSV file
@JsonPropertyOrder({"city","lat","lng","country","iso2",
"adminName","capital","population"})
public static class CityPojo {
public String city;
public BigDecimal lat;
public BigDecimal lng;
public String country;
public String iso2;
public String adminName;
public String capital;
public long population;
}
Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
CsvReaderFormat<CityPojo> csvFormat =
CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));
FileSource<CityPojo> source =
FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();