Schemarückschluss und -entwicklung in Auto Loader konfigurieren

Sie können den Autoloader so konfigurieren, dass das Schema der geladenen Daten automatisch erkannt wird, was es Ihnen erlaubt, Tabellen zu initialisieren, ohne das Datenschema explizit zu deklarieren und das Tabellenschema zu entwickeln, wenn neue Spalten eingeführt werden. Dadurch wird die Notwendigkeit beseitigt, Schemaänderungen im Laufe der Zeit manuell nachverfolgen und anwenden zu müssen.

Autoloader kann auch unerwartete Daten (z. B. von unterschiedlichen Datentypen) in einer JSON-Blob-Spalte „retten“, auf die Sie später über die APIs für den semistrukturierten Datenzugriff zugreifen können.

Die folgenden Formate werden für Schemarückschlüssen und Evolution unterstützt:

Dateiformat Unterstützte Versionen
JSON Alle Versionen
CSV Alle Versionen
XML Databricks Runtime 14.3 LTS und höher
Avro Databricks Runtime 10.4 LTS und höher
Parquet Databricks Runtime 11.3 LTS und höher
ORC Nicht unterstützt
Text Nicht anwendbar (festes Schema)
Binaryfile Nicht anwendbar (festes Schema)

Syntax für Schemainferenz und -entwicklung

Wenn Sie ein Zielverzeichnis für die Option cloudFiles.schemaLocation angeben, ermöglicht dies Schemarückschlüsse und -entwicklung. Sie können dasselbe Verzeichnis verwenden, das Sie für checkpointLocation angeben. Wenn Sie Delta Live Tables verwenden, verwaltet Azure Databricks den Schemaspeicherort und andere Prüfpunktinformationen automatisch.

Hinweis

Wenn Sie mehr als einen Quelldatenspeicherort haben, der in die Zieltabelle geladen wird, erfordert jede Autoloader-Erfassungsworkload einen separaten Streamingprüfpunkt.

Das folgende Beispiel verwendet parquet für die cloudFiles.format. Verwenden Sie csv, avrooder jsonfür andere Dateiquellen. Alle anderen Einstellungen zum Lesen und Schreiben bleiben für das Standardverhalten für jedes Format gleich.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Funktionsweise des Autoloader-Schemarückschlusses

Um beim ersten Laden von Daten auf das Schema zurückzuschließen, zieht der Autoloader die ersten 50 GB oder 1000 Dateien heran, die er entdeckt, je nachdem, welche Grenze zuerst überschritten wird. Der Autoloader speichert die Schemainformationen in dem Verzeichnis _schemas am konfigurierten cloudFiles.schemaLocation, um Schemaänderungen an den Eingabedaten im Laufe der Zeit nachzuverfolgen.

Hinweis

Um die Größe der verwendeten Stichprobe zu ändern, können Sie folgende SQL-Konfigurationen festlegen:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(Bytezeichenfolge, z. B. 10gb)

and

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(Integer)

Standardmäßig versucht der Autoloader-Schemarückschluss, Schemaentwicklungsprobleme aufgrund von Typkonflikten zu vermeiden. Für Formate, die Datentypen (JSON, CSV und XML) nicht codieren, schließt Autoloader alle Spalten als Zeichenfolgen rück (einschließlich geschachtelter Felder in JSON-Dateien). Bei Formaten mit typisiertem Schema (Parquet und Avro) nimmt Autoloader eine Teilmenge von Dateien als Stichprobe und führt die Schemas einzelner Dateien zusammen. Dieses Verhalten wird in der folgenden Tabelle zusammengefasst:

Dateiformat Standardmäßig rückgeschlossener Datentyp
JSON String
CSV String
XML String
Avro Im Avro-Schema codierte Typen
Parquet Im Parquet-Schema codierte Typen

Der Apache Spark DataFrameReader verwendet verschiedene Verhalten für Schemarückschlüsse, wobei er Datentypen für Spalten in JSON-, CSV- und XML-Quellen auf Grundlage von Beispieldaten auswählt. Um dieses Verhalten im Autoloader zu aktivieren, legen Sie die Option cloudFiles.inferColumnTypes auf true fest.

Hinweis

Beim Ableiten des Schemas für CSV-Daten geht der Autoloader davon aus, dass die Dateien Header enthalten. Wenn Ihre CSV-Dateien keine Header enthalten, geben Sie die Option .option("header", "false") an. Darüber hinaus führt der Autoloader die Schemas aller Dateien im Beispiel zusammen, um ein globales Schema zu erstellen. Anschließend kann der Autoloader jede Datei entsprechend ihrem Header lesen und die CSV-Daten ordnungsgemäß analysieren.

Hinweis

Wenn eine Spalte unterschiedliche Datentypen in zwei Parquetdateien aufweist, wählt Auto Loader den breitesten Typ aus. Sie können schemaHints verwenden, um diese Auswahl außer Kraft zu setzen. Wenn Sie Schema-Hinweise angeben, wandelt Auto Loader die Spalte nicht in den angegebenen Typ um, sondern weist den Parquet-Reader an, die Spalte als den angegebenen Typ zu lesen. Im Falle eines Konflikts wird die Spalte in der geretteten Datenspaltegerettet.

Funktionsweise der Autoloader-Schemaentwicklung

Autoloader erkennt das Hinzufügen neuer Spalten, während er Ihre Daten verarbeitet. Wenn Autoloader eine neue Spalte erkennt, wird der Stream mit einer UnknownFieldException beendet. Bevor Ihr Stream diesen Fehler auslöst, führt Autoloader einen Schemarückschluss für den letzten Mikrobatch von Daten durch und aktualisiert den Schemaspeicherort mit dem neuesten Schema, indem neue Spalten am Ende des Schemas zusammengeführt werden. Die Datentypen vorhandener Spalten bleiben unverändert.

Databricks empfiehlt, Autoloader-Streams mit Workflows so zu konfigurieren, dass sie nach solchen Schemaänderungen automatisch neu gestartet werden.

Autoloader unterstützt die folgenden Modi für die Schemaentwicklung, die Sie in der Option cloudFiles.schemaEvolutionMode einstellen:

Mode Verhalten beim Lesen neuer Spalten
addNewColumns (Standardwert) Stream schlägt fehl. Dem Schema werden neue Spalten hinzugefügt. Vorhandene Spalten erhalten keine Datentypen.
rescue Das Schema wird nie entwickelt, und der Stream schlägt aufgrund von Schemaänderungen nicht fehl. Alle neuen Spalten werden in der „rescued data“-Spalte aufgezeichnet.
failOnNewColumns Stream schlägt fehl. Der Stream wird erst dann neu gestartet, wenn das angegebene Schema aktualisiert oder die problematische Datendatei entfernt wurde.
none Das Schema wird nicht entwickelt, neue Spalten werden ignoriert, und die Daten werden nicht gerettet, es sei denn, die Option rescuedDataColumn ist festgelegt. Der Stream schlägt aufgrund von Schemaänderungen nicht fehl.

Funktionsweise von Partitionen mit Autoloader

Autoloader versucht, Partitionsspalten aus der zugrunde liegenden Verzeichnisstruktur der Daten abzuleiten, wenn die Daten im Hive-Stil partitioniert sind. Zum Beispiel führt der Dateipfad base_path/event=click/date=2021-04-01/f0.json dazu, dass date und event als Partitionsspalten rückgeschlossen werden. Wenn die zugrunde liegende Verzeichnisstruktur widersprüchliche Hive-Partitionen enthält oder keine Partitionierung im Hive-Stil aufweist, werden Partitionsspalten ignoriert.

Binärdateiformate (binaryFile) und text-Dateiformate weisen feste Datenschemata auf, unterstützen aber auch den Rückschluss auf Partitionsspalten. Databricks empfiehlt die Festlegung von cloudFiles.schemaLocation für diese Dateiformate. Dadurch werden potenzielle Fehler oder Informationsverluste vermieden und der Rückschluss von Partitionsspalten bei jedem Start eines Autoloaders verhindert.

Partitionsspalten werden bei der Schemaentwicklung nicht berücksichtigt. Wenn eine anfängliche Verzeichnisstruktur wie base_path/event=click/date=2021-04-01/f0.json vorlag und dann neue Dateien als base_path/event=click/date=2021-04-01/hour=01/f1.json empfangen werden, ignoriert Autoloader die Spalte „Stunde“. Um Informationen für neue Partitionsspalten zu erfassen, legen Sie cloudFiles.partitionColumns auf event,date,hour fest.

Hinweis

Die Option cloudFiles.partitionColumns akzeptiert eine durch Trennzeichen getrennten Liste von Spaltennamen. Nur Spalten, die als key=value-Paare in Ihrer Verzeichnisstruktur vorhanden sind, werden analysiert.

Was ist die Spalte „rescued data“ (gerettete Daten)?

Wenn Autoloader das Schema ableitet, wird dem Schema automatisch eine Spalte für gerettete Daten als _rescued_datahinzugefügt. Sie können die Spalte umbenennen oder sie in solchen Fällen einbeziehen, in denen Sie ein Schema durch Festlegen der Option rescuedDataColumn bereitstellen.

Die „rescued data“-Spalte stellt sicher, dass Spalten, die nicht mit dem Schema übereinstimmen, gerettet werden, anstatt gelöscht zu werden. Die Spalte „rescued data“ enthält alle Daten, die aus folgenden Gründen nicht analysiert werden:

  • Die Spalte fehlt im Schema.
  • Typkonflikte.
  • Groß-/Kleinschreibungskonflikte.

Die „rescued data“-Spalte enthält ein JSON-Blob, das die geretteten Spalten und den Quelldateipfad des Datensatzes enthält.

Hinweis

Die JSON- und CSV-Parser unterstützen drei Modi beim Analysieren von Datensätzen: PERMISSIVE, DROPMALFORMEDund FAILFAST. Bei Verwendung mit rescuedDataColumn führen Datentypkonflikte nicht dazu, dass Datensätze im Modus DROPMALFORMED gelöscht werden oder im Modus FAILFAST einen Fehler auslösen. Nur beschädigte Datensätze werden verworfen oder Fehler ausgegeben, z. B. unvollständige oder fehlerhafte JSON- oder CSV-Dateien. Wenn Sie beim Analysieren von JSON- oder CSV-Daten badRecordsPath verwenden, werden Datentypkonflikte bei Verwendung von rescuedDataColumn nicht als fehlerhafte Datensätze betrachtet. Nur unvollständige und falsch formatierte JSON- oder CSV-Datensätze werden in badRecordsPath gespeichert.

Ändern des Verhaltens zur Beachtung der Groß-/Kleinschreibung

Wenn die Beachtung der Groß- und Kleinschreibung nicht aktiviert ist, werden die Spalten abc, Abc und ABC für den Schemarückschluss als dieselbe Spalte betrachtet. Der gewählte Fall ist willkürlich und hängt von den abgetasteten Daten ab. Mithilfe von Schemahinweisen können Sie die zu verwendende Schreibweise erzwingen. Nachdem eine Auswahl getroffen und das Schema abgeleitet wurde, berücksichtigt Auto Loader nicht die Hüllenvarianten, die nicht im Einklang mit dem Schema ausgewählt wurden.

Wenn Spalte für gerettete Daten aktiviert ist, werden Felder, die in einer anderen Schreibweise (bezogen auf Groß- und Kleinschreibung) benannt sind als das Schema, in die Spalte _rescued_data geladen. Sie können dieses Verhalten ändern, indem Sie die Option readerCaseSensitive auf „False“ festlegen. In diesem Fall werden vom Autoloader Daten gelesen, ohne dass zwischen Groß- und Kleinschreibung unterschieden wird.

Außerkraftsetzung des Schemarückschlusses mit Schemahinweisen

Sie können Schemahinweise verwenden, um die Schemainformationen zu erzwingen, die Sie kennen und für ein rückgeschlossenes Schema erwarten. Wenn Sie wissen, dass eine Spalte einen bestimmten Datentyp hat, oder wenn Sie einen noch allgemeineren Datentyp wählen möchten (z. B. double statt integer), können Sie eine beliebige Anzahl von Hinweisen für die Datentypen von Spalten als eine Zeichenfolge mithilfe der SQL-Schemaspezifikationssyntax wie folgt angeben:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Eine Liste der unterstützten Datentypen finden Sie in der Dokumentation zu Datentypen.

Wenn eine Spalte am Anfang des Streams nicht vorhanden ist, können Sie auch Schemahinweise verwenden, um diese Spalte dem abgeleiteten Schema hinzuzufügen.

Hier ist ein Beispiel für ein abgeleitetes Schema, um das Verhalten mit Schemahinweisen zu veranschaulichen.

Abgeleitetes Schema:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Durch Angeben der folgenden Schemahinweise:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

erhalten Sie:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Hinweis

Die Unterstützung von Array- und Zuordnungsschemahinweisen ist in Databricks Runtime 9.1 LTS und höher verfügbar.

Hier ist ein Beispiel für ein abgeleitetes Schema mit komplexen Datentypen, um das Verhalten mit Schemahinweisen zu veranschaulichen.

Abgeleitetes Schema:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Durch Angeben der folgenden Schemahinweise:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

erhalten Sie:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Hinweis

Schemahinweise werden nur verwendet, wenn Sie dem Autoloader kein Schema bereitstellen. Sie können Schemahinweise verwenden, unabhängig davon, ob cloudFiles.inferColumnTypes aktiviert oder deaktiviert ist.