This is my code:
basePath = f"/mnt/raw/{system_name}/"
baseCheckpointPath = f"{basePath}_____checkpoints/"
baseSchemasPath = f"{basePath}_____autoloaderSchemas/"
# COMMAND ----------
def stream_csv_table_from_tablename(tableName):
tableDf = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", f"{baseSchemasPath}{tableName}")
.option("cloudFiles.inferColumnTypes", True)
.option("header", True)
.option("delimiter", ",")
.load(f"{basePath}/{tableName}/")
)
return tableDf
This is the sample/documentation code:
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
.load("<path_to_source_data>")
.writeStream
.option("checkpointLocation", "<path_to_checkpoint>")
.start("<path_to_target")
)
The <path_to_target> (csv table directory) is an existing location, and I have the nessecary permissions for this folder/directory, but the <path_to_checkpoint> (schema information location) does not exist, as it should be created automatically when running the notebook, as far as I have understood - could this be the error?
Our runtime version is higher than the suggested.