Teilen über


Ableiten und Entwickeln des Schemas in Pipelines mithilfe von from_json

Von Bedeutung

Dieses Feature befindet sich in der Public Preview.

In diesem Artikel wird beschrieben, wie Sie das Schema von JSON-Blobs mit der from_json SQL-Funktion in Lakeflow Spark Declarative Pipelines ableiten und weiterentwickeln.

Überblick

Die from_json SQL-Funktion analysiert eine JSON-Zeichenfolgenspalte und gibt einen Strukturwert zurück. Wenn Sie außerhalb einer Pipeline verwendet werden, müssen Sie das Schema des zurückgegebenen Werts explizit mithilfe des schema Arguments angeben. Wenn Sie Lakeflow Spark Declarative Pipelines verwenden, können Sie Schemaschlussfolgerung und Schemaevolution aktivieren, wodurch das Schema des zurückgegebenen Werts automatisch verwaltet wird. Dieses Feature vereinfacht sowohl das anfängliche Setup (insbesondere, wenn das Schema unbekannt ist) als auch laufende Vorgänge, wenn sich das Schema häufig ändert. Es ermöglicht eine nahtlose Verarbeitung beliebiger JSON-Blobs aus Streamingdatenquellen wie Auto Loader, Kafka oder Kinesis.

Insbesondere kann bei Verwendung in einer Pipeline Schema-Inferenz und Evolution für die from_json SQL-Funktion ermöglichen:

  • Erkennen neuer Felder in eingehenden JSON-Datensätzen (einschließlich geschachtelter JSON-Objekte)
  • Feldtypen ermitteln und diesen entsprechende Spark-Datentypen zuordnen
  • Automatisches Entwickeln des Schemas zur Aufnahme neuer Felder
  • Automatisches Behandeln von Daten, die dem aktuellen Schema nicht entsprechen

Syntax: Automatisches Ableiten und Entwickeln des Schemas

Um die Schema-Inferenz mit from_json in einer Pipeline zu aktivieren, legen Sie das Schema auf NULL fest und setzen Sie die schemaLocationKey Option. Auf diese Weise können Sie das Schema ableiten und nachverfolgen.

SQL

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))

Python

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})

Eine Abfrage kann mehrere from_json Ausdrücke enthalten, aber jeder Ausdruck muss über einen eindeutigen schemaLocationKeyAusdruck verfügen. Dies schemaLocationKey muss auch pro Pipeline eindeutig sein.

SQL

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

Syntax: Festes Schema

Wenn Sie stattdessen ein bestimmtes Schema erzwingen möchten, können Sie die folgende from_json Syntax verwenden, um die JSON-Zeichenfolge mithilfe dieses Schemas zu analysieren:

from_json(jsonStr, schema, [, options])

Diese Syntax kann in jeder Azure Databricks-Umgebung verwendet werden, einschließlich Lakeflow Spark Declarative Pipelines. Weitere Informationen finden Sie hier.

Schema Inferenz

from_json leitet das Schema aus dem ersten Batch von JSON-Datenspalten ab und indiziert es intern anhand des schemaLocationKey Schemas (erforderlich).

Wenn es sich bei der JSON-Zeichenfolge um ein einzelnes Objekt handelt (z.B. {"id": 123, "name": "John"}), leitet from_json ein Schema vom Typ STRUCT ab und fügt rescuedDataColumn der Liste der Felder hinzu.

STRUCT<id LONG, name STRING, _rescued_data STRING>

Wenn die JSON-Zeichenfolge jedoch ein Array der obersten Ebene, wie ["id": 123, "name": "John"], aufweist, wird from_json das ARRAY in eine STRUKTUR eingebettet. Dieser Ansatz ermöglicht das Retten von Daten, die mit dem abgeleiteten Schema nicht kompatibel sind. Sie haben die Möglichkeit, die Arraywerte nachgelagert in separate Zeilen aufzuteilen.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Außerkraftsetzen der Schemaausleitung mithilfe von Schemahinweisen

Sie können optional schemaHints angeben, um zu beeinflussen, wie from_json den Typ einer Spalte ableitet. Dies ist hilfreich, wenn Sie wissen, dass eine Spalte einen bestimmten Datentyp aufweist oder wenn Sie einen allgemeineren Datentyp auswählen möchten (z. B. ein Double anstelle einer ganzen Zahl). Sie können eine beliebige Anzahl von Hinweisen für Spaltendatentypen mithilfe der SQL-Schemaspezifikationssyntax bereitstellen. Die Semantik für Schemahinweise ist dieselbe wie für die Auto Loader-Schemahinweise. Beispiel:

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

Wenn die JSON-Zeichenfolge ein ARRAY der obersten Ebene enthält, wird sie in eine STRUKTUR eingeschlossen. In diesen Fällen werden Schemahinweise auf das ARRAY-Schema und nicht auf das umschlossene Struktur-Schema angewendet. Ziehen Sie beispielsweise eine JSON-Zeichenfolge mit einem Array der obersten Ebene in Betracht, z. B.:

[{"id": 123, "name": "John"}]

Das abgeleitete ARRAY-Schema wird in eine STRUKTUR eingeschlossen:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Um den Datentyp von idzu ändern, geben Sie den Schemahinweis als element.id STRING an. Geben Sie element.new_col DOUBLE an, um eine neue Spalte vom Typ DOUBLE hinzuzufügen. Aufgrund dieser Hinweise wird das Schema für das JSON-Array auf oberster Ebene spezifiziert als:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Entwickeln des Schemas mithilfe von schemaEvolutionMode

from_json erkennt das Hinzufügen neuer Spalten, während sie Ihre Daten verarbeitet. Wenn from_json ein neues Feld erkannt wird, aktualisiert es das abgeleitete Schema mit dem neuesten Schema, indem neue Spalten am Ende des Schemas zusammengeführt werden. Die Datentypen vorhandener Spalten bleiben unverändert. Nach der Schemaaktualisierung wird die Pipeline automatisch mit dem aktualisierten Schema neu gestartet.

from_json unterstützt die folgenden Modi für die Schemaentwicklung, die Sie mit der optionalen schemaEvolutionMode Einstellung festlegen. Diese Modi sind mit dem automatischen Laden konsistent.

schemaEvolutionMode Verhalten beim Lesen einer neuen Spalte
addNewColumns (Standardwert) Stream schlägt fehl. Dem Schema werden neue Spalten hinzugefügt. Vorhandene Spalten entwickeln keine Datentypen.
rescue Das Schema wird nie weiterentwickelt, und der Datenstrom schlägt aufgrund von Schemaänderungen nicht fehl. Alle neuen Spalten werden in der Spalte "Gerettete Daten" aufgezeichnet.
failOnNewColumns Stream schlägt fehl. Stream wird nicht neu gestartet, es sei denn, die schemaHints werden aktualisiert oder die problematischen Daten werden entfernt.
none Das Schema wird nicht weiterentwickelt, neue Spalten werden ignoriert, und Die Daten werden nur dann wiederhergestellt, wenn die rescuedDataColumn Option festgelegt ist. Stream schlägt aufgrund von Schemaänderungen nicht fehl.

Beispiel:

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

Gerettete Datenspalte

Dem Schema wird automatisch eine gerettete Datenspalte als _rescued_data hinzugefügt. Sie können die Spalte umbenennen, indem Sie die rescuedDataColumn Option festlegen. Beispiel:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

Wenn Sie die Spalte für gerettete Daten verwenden, werden alle Spalten, die nicht mit dem abgeleiteten Schema übereinstimmen, gerettet, anstatt gelöscht zu werden. Dies kann aufgrund eines Datentypkonflikts, einer fehlenden Spalte im Schema oder aufgrund von Unterschieden in der Groß-/Kleinschreibung bei Spaltennamen auftreten.

Behandeln beschädigter Datensätze

Um Datensätze zu speichern, die falsch formatiert sind und nicht analysiert werden können, fügen Sie eine _corrupt_record Spalte hinzu, indem Sie Schemahinweise festlegen, z. B. im folgenden Beispiel:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Legen Sie die columnNameOfCorruptRecord Option fest, um die Beschädigte Datensatzspalte umzubenennen.

Der JSON-Parser unterstützt drei Modi für die Behandlung beschädigter Datensätze:

Modus Description
PERMISSIVE Bei beschädigten Datensätzen wird die falsch formatierte Zeichenfolge in ein Feld eingefügt, das von columnNameOfCorruptRecord konfiguriert ist, und die falsch formatierten Felder werden auf null gesetzt. Um beschädigte Datensätze beizubehalten, können Sie ein Zeichenfolgentypfeld festlegen, das in einem benutzerdefinierten Schema benannt ist columnNameOfCorruptRecord . Wenn ein Schema nicht über das Feld verfügt, werden beschädigte Datensätze beim Parsen gelöscht. Beim Ableiten eines Schemas fügt der Parser implizit ein columnNameOfCorruptRecord Feld im Ausgabeschema hinzu.
DROPMALFORMED Ignoriert beschädigte Datensätze.
Wenn Sie den DROPMALFORMED-Modus mit rescuedDataColumn verwenden, führen Datentypkonflikte nicht dazu, dass Datensätze ausgeschlossen werden. Es werden nur beschädigte Datensätze gelöscht, z. B. unvollständige oder falsch formatierte JSON-Daten.
FAILFAST Löst eine Ausnahme aus, wenn der Parser beschädigte Datensätze antrifft.
Wenn Sie FAILFAST-Modus zusammen mit rescuedDataColumn verwenden, werfen Datentypkonflikte keinen Fehler. Nur beschädigte Datensätze lösen Fehler aus, z. B. unvollständige oder falsch formatierte JSON.Only corrupt records throw errors, such as unvollständig or malformed JSON.

Verweisen auf ein Feld in der ausgabe from_json

from_json leitet das Schema während der Pipelineausführung ab. Wenn eine nachgeschaltete Abfrage auf ein from_json Feld verweist, bevor die from_json Funktion mindestens ein Mal erfolgreich ausgeführt wurde, wird das Feld nicht aufgelöst und die Abfrage wird übersprungen. Im folgenden Beispiel wird die Analyse für die Silbertabellenabfrage übersprungen, bis die from_json Funktion in der Bronzeabfrage ausgeführt und das Schema abgeleitet wurde.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

Wenn die from_json Funktion und die Felder, auf die die Funktion folgt, in derselben Abfrage referenziert werden, schlägt die Analyse möglicherweise wie im folgenden Beispiel fehl:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Sie können dies beheben, indem Sie den Verweis auf das from_json Feld in eine nachgeschaltete Abfrage verschieben (z. B. das Bronze/Silber-Beispiel oben).) Alternativ können Sie angeben schemaHints , dass die verwiesenen from_json Felder enthalten. Beispiel:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Beispiele: Automatisches Ableiten und Entwickeln des Schemas

Dieser Abschnitt enthält Beispielcode zum Aktivieren der automatischen Schemaableitung und -anpassung mithilfe von Lakeflow Spark Declarative Pipelines mit from_json.

Erstellen einer Streamingtabelle aus dem Cloudobjektspeicher

Im folgenden Beispiel wird die Syntax read_files verwendet, um eine Streamingtabelle aus dem Cloudobjektspeicher zu erstellen.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python

@dp.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

Erstellen einer Streamingtabelle aus Kafka

Im folgenden Beispiel wird die Syntax read_kafka verwendet, um eine Streaming-Tabelle aus Kafka zu erstellen.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

Python

@dp.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

Beispiele: Festes Schema

Beispielcode mit from_json einem festen Schema finden Sie unter from_json "Funktion".

Häufig gestellte Fragen

In diesem Abschnitt werden häufig gestellte Fragen zu Schemainference und Evolutionsunterstützung in der from_json Funktion beantwortet.

Was ist der Unterschied zwischen from_json und parse_json?

Die parse_json Funktion gibt einen VARIANT Wert aus der JSON-Zeichenfolge zurück.

VARIANT bietet eine flexible und effiziente Möglichkeit zum Speichern halbstrukturierter Daten. Dadurch werden Schemaableitungen und -evolution umgangen, indem strikte Typen vollständig abgeschafft werden. Wenn Sie jedoch ein Schema zur Schreibzeit erzwingen möchten (z. B. weil Sie über ein relativ strenges Schema verfügen), from_json ist dies möglicherweise eine bessere Option.

In der folgenden Tabelle werden die Unterschiede zwischen from_json und parse_json beschrieben:

Funktion Anwendungsfälle Verfügbarkeit
from_json Die Schemaentwicklung mit from_json beibehält das Schema. Dies ist hilfreich, wenn:
  • Sie möchten Ihr Datenschema erzwingen (z. B. überprüfen Sie jede Schemaänderung, bevor Sie es beibehalten).
  • Sie möchten den Speicher optimieren und benötigen niedrige Abfragelatenz sowie geringe Kosten.
  • Sie möchten Daten mit inkompatiblen Typen fehlschlagen lassen.
  • Sie möchten Teilergebnisse aus beschädigten JSON-Datensätzen extrahieren und den falsch formatierten Datensatz in der _corrupt_record Spalte speichern. Im Gegensatz dazu gibt die VARIANT-Eingabe einen Fehler für nicht gültiges JSON zurück.
Verfügbar mit Schemavermutung und Evolution nur in Lakeflow Spark Deklarative Pipelines
parse_json VARIANT eignet sich besonders gut zum Halten von Daten, die nicht schematisiert werden müssen. Beispiel:
  • Sie möchten die Daten halbstrukturiert halten, da sie flexibel sind.
  • Das Schema ändert sich zu schnell, um es ohne häufige Datenstromfehler und Neustarts in ein Schema zu umwandeln.
  • Sie möchten nicht bei Daten mit nicht übereinstimmenden Typen scheitern. (DIE VARIANT-Aufnahme ist für gültige JSON-Datensätze immer erfolgreich – auch wenn Typenkonflikte vorhanden sind.)
  • Ihre Benutzer möchten nicht mit der Spalte für gerettete Daten umgehen, die Felder enthält, die nicht dem Schema entsprechen.
Verfügbar mit und ohne Lakeflow Spark Declarative Pipelines

Kann ich from_json Schema-Inferenz- und Evolutionssyntax außerhalb von Lakeflow Spark Declarative Pipelines verwenden?

Nein, Sie können die Schema-Inferenz- und Evolution-Syntax nicht außerhalb von from_json Lakeflow Spark Declarative Pipelines verwenden.

Wie kann ich auf das von from_json abgeleitete Schema zugreifen?

Zeigen Sie das Schema der Zielstreamingtabelle an.

Kann ich ein Schema übergeben from_json und auch die Entwicklung durchführen?

Nein, Sie können kein Schema übergeben from_json und auch die Weiterentwicklung durchführen. Sie können jedoch Schemahinweise bereitstellen, um einige oder alle Felder zu überschreiben, die von from_json abgeleitet werden.

Was geschieht mit dem Schema, wenn die Tabelle vollständig aktualisiert wird?

Die der Tabelle zugeordneten Schemaspeicherorte werden gelöscht, und das Schema wird von Grund auf neu abgeleitet.