0

I'm reading csv files and processing them daily so I can append the data to my bronze layer in databricks using autolader. The code looks like this:

 def run_autoloader(table_name, checkpoint_path, latest_file_location, new_columns):
# Configure Auto Loader to ingest parquet data to a Delta table
 (spark.readStream
 .format("cloudFiles")
 #.schema(df_schema)
 .option("cloudFiles.format", "parquet")
 .option("cloudFiles.schemaLocation", checkpoint_path)
 .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
 .load(latest_file_location)
 .toDF(*new_columns)
 .select("*", spark_col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"),current_date().alias("processing_date"))
 .writeStream
 .option("checkpointLocation", checkpoint_path)
 .trigger(once=True)
 .option("mergeSchema", "true")
 .toTable(table_name))

Previously this was able to handle evolving schemas, but today after the introduction of a new column in the input csv's I got the following error:

 requirement failed: The number of columns doesn't match.

I've read some posts suggesting editing the schema manually or resetting the schema by deleting the schema checkpoint path, but one would require manual maintenance and the other would mean we have to wipe all our bronze data so for now neither is an option, especially if it's only a temporary fix.

I don't understand why this suddenly started happening as this is specifically what the autoloader was designed to do.

Any help would be much appreciated.

asked Dec 19, 2024 at 9:42

1 Answer 1

0

Can you clarify in your question if you are attempting to read parquet or csv? In the code snippet you provided you are specifying the format as parquet .option("cloudFiles.format", "parquet"). If you are trying to read csv files using autoloader you should specify the format as csv.

  1. For CSV files, you need to set cloudFiles.inferColumnTypes to true if you want to infer the column datatypes. its default by false as specified in the documentation link below.
  2. Double check checkpoint_path contains the inferred schema information and the checkpoint information.

referencing this documentation

(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.inferColumnTypes", "true") # check docs for explanation
.load(latest_file_location)
.toDF(*new_columns)
.select("*", spark_col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"),current_date().alias("processing_date"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.option("mergeSchema", "true")
.toTable(table_name))
answered Dec 20, 2024 at 5:33
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.