Configure schema inference and evolution in Auto Loader
You can configure Auto Loader to automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time.
Auto Loader can also “rescue” data that was unexpected (for example, of differing data types) in a JSON blob column, that you can choose to access later using the semi-structured data access APIs.
The following formats are supported for schema inference and evolution:
|File format||Supported versions|
||Databricks Runtime 8.2 and above|
||Databricks Runtime 8.3 and above|
||Databricks Runtime 10.2 and above|
||Databricks Runtime 11.1 and above|
||Not applicable (fixed-schema)|
||Not applicable (fixed-schema)|
Syntax for schema inference and evolution
Specifying a target directory for the option
cloudFiles.schemaLocation enables schema inference and evolution. You can choose to use the same directory you specify for the
checkpointLocation. If you use Delta Live Tables, Azure Databricks manages schema location and other checkpoint information automatically.
If you have more than one source data location being loaded into the target table, each Auto Loader ingestion workload requires a separate streaming checkpoint.
The following example uses
parquet for the
json for other file sources. All other settings for read and write stay the same for the default behaviors for each format.
(spark.readStream.format("cloudFiles") .option("cloudFiles.format", "parquet") # The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") .load("<path-to-source-data>") .writeStream .option("checkpointLocation", "<path-to-checkpoint>") .start("<path_to_target") )
spark.readStream.format("cloudFiles") .option("cloudFiles.format", "parquet") // The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") .load("<path-to-source-data>") .writeStream .option("checkpointLocation", "<path-to-checkpoint>") .start("<path_to_target")
To infer the schema when first reading data, Auto Loader samples the first 50 GB or 1000 files that it discovers, whichever limit is crossed first. Auto Loader stores the schema information in a directory
_schemas at the configured
cloudfFiles.schemaLocation to track schema changes to the input data over time.
To change the size of the sample that’s used you can set the SQL configurations:
(byte string, for example
By default, Auto Loader schema inference seeks to avoid schema evolution issues due to type mismatches. For formats that don’t encode data types (JSON and CSV), Auto Loader infers all columns as strings (including nested fields in JSON files). For formats with typed schema (Parquet and Avro), Auto Loader samples a subset of files and merges the schemas of individual files. This behavior is summarized in the following table:
|File format||Default inferred data type|
||Types encoded in Avro schema|
||Types encoded in Parquet schema|
The Apache Spark DataFrameReader uses different behavior for schema inference, selecting data types for columns in JSON and CSV sources based on sample data. To enable this behavior with Auto Loader, set the option
When inferring schema for CSV data, Auto Loader assumes that the files contain headers. If your CSV files do not contain headers, provide the option
.option("header", "false"). In addition, Auto Loader merges the schemas of all the files in the sample to come up with a global schema. Auto Loader can then read each file according to its header and parse the CSV correctly.
When a column has different data types in two Parquet files, Auto Loader attempts to
upcast one type to the other. If upcasting is not possible, data inference fails. See the following table for examples:
|Type 1||Type 2||Upcast type|
After merging data types on inference, files containing records of the unselected type are loaded to the rescued data column, because the data type is different from the inferred schema.
Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an
UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged.
Databricks recommends configuring Auto Loader streams with workflows to restart automatically after such schema changes.
Auto Loader supports the following modes for schema evolution, which you set in the option
|Mode||Behavior on reading new column|
||Stream fails. New columns are added to the schema. Existing columns do not evolve data types.|
||Schema is never evolved and stream does not fail due to schema changes. All new columns are recorded in the rescued data column.|
||Stream fails. Stream does not restart unless the provided schema is updated, or the offending data file is removed.|
||Does not evolve the schema, new columns are ignored, and data is not rescued unless the
How do partitions work with Auto Loader?
Auto Loader attempts to infer partition columns from the underlying directory structure of the data if the data is laid out in Hive style partitioning. For example, the file path
base_path/event=click/date=2021-04-01/f0.json results in the inference of
event as partition columns. If the underlying directory structure contains conflicting Hive partitions or doesn’t contain Hive style partitioning, partition columns are ignored.
Binary file (
text file formats have fixed data schemas, but support partition column inference. Databricks recommends setting
cloudFiles.schemaLocation for these file formats. This avoids any potential errors or information loss and prevents inference of partitions columns each time an Auto Loader begins.
Partition columns are not considered for schema evolution. If you had an initial directory structure like
base_path/event=click/date=2021-04-01/f0.json, and then start receiving new files as
base_path/event=click/date=2021-04-01/hour=01/f1.json, Auto Loader ignores the hour column. To capture information for new partition columns, set
cloudFiles.partitionColumns takes a comma-separated list of column names. Only columns that exist as
key=value pairs in your directory structure are parsed.
When Auto Loader infers the schema, a rescued data column is automatically added to your schema as
_rescued_data. You can rename the column or include it in cases where you provide a schema by setting the option
The rescued data column ensures that columns that don’t match with the schema are rescued instead of being dropped. The rescued data column contains any data that isn’t parsed for the following reasons:
- The column is missing from the schema.
- Type mismatches.
- Case mismatches.
The rescued data column contains a JSON containing the rescued columns and the source file path of the record.
The JSON and CSV parsers support three modes when parsing records:
FAILFAST. When used together with
rescuedDataColumn, data type mismatches do not cause records to be dropped in
DROPMALFORMED mode or throw an error in
FAILFAST mode. Only corrupt records are dropped or throw errors, such as incomplete or malformed JSON or CSV. If you use
badRecordsPath when parsing JSON or CSV, data type mismatches are not considered as bad records when using the
rescuedDataColumn. Only incomplete and malformed JSON or CSV records are stored in
Unless case sensitivity is enabled, the columns
ABC are considered the same column for the purposes of schema inference. The case that is chosen is arbitrary and depends on the sampled data. You can use schema hints to enforce which case should be used. Once a selection has been made and the schema is inferred, Auto Loader does not consider the casing variants that were not selected consistent with the schema.
When rescued data column is enabled, fields named in a case other than that of the schema are loaded to the
_rescued_data column. Change this behavior by setting the option
readerCaseSensitive to false, in which case Auto Loader reads data in a case-insensitive way.
You can use schema hints to enforce the schema information that you know and expect on an inferred schema. When you know that a column is of a specific data type, or if you want to choose a more general data type (for example, a
double instead of an
integer), you can provide an arbitrary number of hints for column data types as a string using SQL schema specification syntax, such as the following:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
See the documentation on data types for the list of supported data types.
If a column is not present at the start of the stream, you can also use schema hints to add that column to the inferred schema.
Here is an example of an inferred schema to see the behavior with schema hints.
|-- date: string |-- quantity: int |-- user_info: struct | |-- id: string | |-- name: string | |-- dob: string |-- purchase_options: struct | |-- delivery_address: string
By specifying the following schema hints:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
|-- date: string -> date |-- quantity: int |-- user_info: struct | |-- id: string | |-- name: string | |-- dob: string -> date |-- purchase_options: struct -> map<string,string> |-- time: timestamp
Array and Map schema hints support is available in Databricks Runtime 9.1 LTS and above.
Here is an example of an inferred schema with complex datatypes to see the behavior with schema hints.
|-- products: array<string> |-- locations: array<string> |-- users: array<struct> | |-- users.element: struct | | |-- id: string | | |-- name: string | | |-- dob: string |-- ids: map<string,string> |-- names: map<string,string> |-- prices: map<string,string> |-- discounts: map<struct,string> | |-- discounts.key: struct | | |-- id: string | |-- discounts.value: string |-- descriptions: map<string,struct> | |-- descriptions.key: string | |-- descriptions.value: struct | | |-- content: int
By specifying the following schema hints:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
|-- products: array<string> -> array<int> |-- locations: array<int> -> array<string> |-- users: array<struct> | |-- users.element: struct | | |-- id: string -> int | | |-- name: string | | |-- dob: string |-- ids: map<string,string> -> map<string,int> |-- names: map<string,string> -> map<int,string> |-- prices: map<string,string> -> map<string,int> |-- discounts: map<struct,string> | |-- discounts.key: struct | | |-- id: string -> int | |-- discounts.value: string |-- descriptions: map<string,struct> | |-- descriptions.key: string | |-- descriptions.value: struct | | |-- content: int -> string
Schema hints are used only if you do not provide a schema to Auto Loader. You can use schema hints whether
cloudFiles.inferColumnTypes is enabled or disabled.