Konfigurera schemainferens och utveckling i Auto Loader

Du kan konfigurera automatisk inläsning för att automatiskt identifiera schemat för inlästa data, så att du kan initiera tabeller utan att uttryckligen deklarera dataschemat och utveckla tabellschemat när nya kolumner introduceras. Detta eliminerar behovet av att manuellt spåra och tillämpa schemaändringar över tid.

Automatisk inläsare kan också "rädda" data som var oväntade (till exempel av olika datatyper) i en JSON-blobkolumn, som du kan välja att komma åt senare med hjälp av api:erna för halvstrukturerad dataåtkomst.

Följande format stöds för schemainferens och utveckling:

File format Versioner som stöds
JSON Alla versioner
CSV Alla versioner
XML Databricks Runtime 14.3 LTS och senare
Avro Databricks Runtime 10.4 LTS och senare
Parquet Databricks Runtime 11.3 LTS och senare
ORC Stöd saknas
Text Ej tillämpligt (fast schema)
Binaryfile Ej tillämpligt (fast schema)

Syntax för schemainferens och utveckling

Om du anger en målkatalog för alternativet cloudFiles.schemaLocation aktiveras schemainferens och utveckling. Du kan välja att använda samma katalog som du anger för checkpointLocation. Om du använder Delta Live Tables hanterar Azure Databricks schemaplats och annan kontrollpunktsinformation automatiskt.

Kommentar

Om du har mer än en källdataplats som läses in i måltabellen kräver varje arbetsbelastning för automatisk inläsning en separat kontrollpunkt för direktuppspelning.

I följande exempel används parquet för cloudFiles.format. Använd csv, avroeller json för andra filkällor. Alla andra inställningar för läsning och skrivning förblir desamma för standardbeteenden för varje format.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Hur fungerar schemainferens för automatisk inläsning?

För att härleda schemat vid första läsningen av data tar Auto Loader exempel på de första 50 GB eller 1 000 filer som identifieras, beroende på vilken gräns som överskrids först. Automatisk inläsning lagrar schemainformationen i en katalog _schemas på den konfigurerade cloudFiles.schemaLocation för att spåra schemaändringar i indata över tid.

Kommentar

Om du vill ändra storleken på det exempel som används kan du ange SQL-konfigurationerna:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(bytesträng, till exempel 10gb)

och

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(heltal)

Som standard försöker schemainferensen för automatisk inläsning undvika problem med schemautveckling på grund av typmatchningar. För format som inte kodar datatyper (JSON, CSV och XML) härleder Auto Loader alla kolumner som strängar (inklusive kapslade fält i JSON-filer). För format med skrivet schema (Parquet och Avro) tar Auto Loader exempel på en delmängd av filer och sammanfogar scheman för enskilda filer. Det här beteendet sammanfattas i följande tabell:

File format Standard härledd datatyp
JSON String
CSV String
XML String
Avro Typer som kodas i Avro-schema
Parquet Typer som kodas i Parquet-schema

Apache Spark DataFrameReader använder olika beteende för schemainferens och väljer datatyper för kolumner i JSON-, CSV- och XML-källor baserat på exempeldata. Om du vill aktivera det här beteendet med Auto Loader anger du alternativet cloudFiles.inferColumnTypes till true.

Kommentar

När schemat för CSV-data härleds förutsätter Auto Loader att filerna innehåller rubriker. Om dina CSV-filer inte innehåller rubriker anger du alternativet .option("header", "false"). Dessutom sammanfogar Auto Loader scheman för alla filer i exemplet för att komma fram till ett globalt schema. Automatisk inläsare kan sedan läsa varje fil enligt rubriken och parsa CSV:en korrekt.

Kommentar

När en kolumn har olika datatyper i två Parquet-filer väljer Auto Loader den bredaste typen. Du kan använda schemaHints för att åsidosätta det här valet. När du anger schematips omvandlar autoinläsaren inte kolumnen till den angivna typen, utan uppmanar i stället Parquet-läsaren att läsa kolumnen som den angivna typen. Vid matchningsfel räddas kolumnen i den räddade datakolumnen.

Hur fungerar schemautvecklingen för automatisk inläsning?

Automatisk inläsning identifierar tillägg av nya kolumner när dina data bearbetas. När Auto Loader identifierar en ny kolumn stoppas strömmen med en UnknownFieldException. Innan strömmen genererar det här felet utför Auto Loader schemainferens på den senaste mikrobatchen med data och uppdaterar schemaplatsen med det senaste schemat genom att slå samman nya kolumner till slutet av schemat. Datatyperna för befintliga kolumner förblir oförändrade.

Databricks rekommenderar att du konfigurerar automatiska inläsningsströmmar med arbetsflöden så att de startas om automatiskt efter sådana schemaändringar.

Auto Loader stöder följande lägen för schemautveckling, som du anger i alternativet cloudFiles.schemaEvolutionMode:

Läge Beteende vid läsning av ny kolumn
addNewColumns (standard) Strömmen misslyckas. Nya kolumner läggs till i schemat. Befintliga kolumner utvecklar inte datatyper.
rescue Schemat har aldrig utvecklats och strömmen misslyckas inte på grund av schemaändringar. Alla nya kolumner registreras i den räddade datakolumnen.
failOnNewColumns Strömmen misslyckas. Stream startar inte om om inte det angivna schemat uppdateras eller om den felaktiga datafilen tas bort.
none Utvecklar inte schemat, nya kolumner ignoreras och data räddas inte om inte rescuedDataColumn alternativet har angetts. Stream misslyckas inte på grund av schemaändringar.

Hur fungerar partitioner med Auto Loader?

Auto loader försöker härleda partitionskolumner från den underliggande katalogstrukturen för data om data anges i Hive-formatpartitionering. Till exempel resulterar filsökvägen base_path/event=click/date=2021-04-01/f0.json i slutsatsdragningen av date och event som partitionskolumner. Om den underliggande katalogstrukturen innehåller hive-partitioner i konflikt eller inte innehåller Hive-formatpartitionering ignoreras partitionskolumner.

Binära filer (binaryFile) och text filformat har fasta datascheman, men stöder inferens för partitionskolumner. Databricks rekommenderar inställning cloudFiles.schemaLocation för dessa filformat. Detta undviker eventuella fel eller informationsförluster och förhindrar slutsatsdragning av partitionskolumner varje gång en automatisk inläsare börjar.

Partitionskolumner beaktas inte för schemautveckling. Om du hade en inledande katalogstruktur som base_path/event=click/date=2021-04-01/f0.json, och sedan börjar ta emot nya filer som base_path/event=click/date=2021-04-01/hour=01/f1.json, ignorerar Auto Loader timkolumnen. Om du vill samla in information om nya partitionskolumner anger du cloudFiles.partitionColumns till event,date,hour.

Kommentar

Alternativet cloudFiles.partitionColumns tar en kommaavgränsad lista med kolumnnamn. Endast kolumner som finns som key=value par i katalogstrukturen parsas.

Vad är den räddade datakolumnen?

När Auto Loader härleder schemat läggs automatiskt en räddad datakolumn till i schemat som _rescued_data. Du kan byta namn på kolumnen eller inkludera den i fall där du anger ett schema genom att ange alternativet rescuedDataColumn.

Den räddade datakolumnen säkerställer att kolumner som inte matchar schemat räddas i stället för att tas bort. Den räddade datakolumnen innehåller alla data som inte parsas av följande skäl:

  • Kolumnen saknas i schemat.
  • Typmatchningsfel.
  • Skiftlägesfel.

Den räddade datakolumnen innehåller en JSON som innehåller de räddade kolumnerna och källfilens sökväg för posten.

Kommentar

JSON- och CSV-parsarna stöder tre lägen vid parsning av poster: PERMISSIVE, DROPMALFORMEDoch FAILFAST. När de används tillsammans med rescuedDataColumn, orsakar inte datatypsmatchningar att poster tas bort i DROPMALFORMED läge eller utlöser ett fel i FAILFAST läge. Endast skadade poster tas bort eller utlöser fel, till exempel ofullständig eller felaktigt JSON eller CSV. Om du använder badRecordsPath när du parsar JSON eller CSV betraktas inte datatypsmatchningar som felaktiga poster när du använder rescuedDataColumn. Endast ofullständiga och felaktiga JSON- eller CSV-poster lagras i badRecordsPath.

Ändra skiftlägeskänsligt beteende

Om inte skiftlägeskänslighet är aktiverat betraktas kolumnerna abc, Abcoch och ABC som samma kolumn för schemainferens. Det valda fallet är godtyckligt och beror på exempeldata. Du kan använda schematips för att framtvinga vilket fall som ska användas. När ett val har gjorts och schemat har härledts tar Auto Loader inte hänsyn till de höljevarianter som inte har valts i enlighet med schemat.

När den räddade datakolumnen är aktiverad läses fält med namnet i ett annat fall än schemat in i _rescued_data kolumnen. Ändra det här beteendet genom att ange alternativet readerCaseSensitive till false, i vilket fall Auto Loader läser data på ett skiftlägesokänsligt sätt.

Åsidosätta schemainferens med schematips

Du kan använda schematips för att framtvinga schemainformationen som du känner till och förväntar dig på ett härledt schema. När du vet att en kolumn är av en specifik datatyp, eller om du vill välja en mer allmän datatyp (till exempel en double i stället för en integer), kan du ange ett godtyckligt antal tips för kolumndatatyper som en sträng med sql-schemaspecifikationssyntax, till exempel följande:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Se dokumentationen om datatyper för listan över datatyper som stöds.

Om en kolumn inte finns i början av strömmen kan du också använda schematips för att lägga till den kolumnen i det här schemat.

Här är ett exempel på ett härledt schema för att se beteendet med schematips.

Härledt schema:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Genom att ange följande schematips:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

du får:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Kommentar

Stöd för matris- och kartschematips finns i Databricks Runtime 9.1 LTS och senare.

Här är ett exempel på ett härledt schema med komplexa datatyper för att se beteendet med schematips.

Härledt schema:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Genom att ange följande schematips:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

du får:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Kommentar

Schematips används endast om du inte anger något schema för automatisk inläsning. Du kan använda schematips om cloudFiles.inferColumnTypes det är aktiverat eller inaktiverat.