Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Ableiten und Entwickeln des Schemas in Pipelines mithilfe von
Von Bedeutung
Dieses Feature befindet sich in der Public Preview.
In diesem Artikel wird beschrieben, wie Sie das Schema von JSON-Blobs mit der from_json SQL-Funktion in Lakeflow Spark Declarative Pipelines ableiten und weiterentwickeln.
Überblick
Die from_json SQL-Funktion analysiert eine JSON-Zeichenfolgenspalte und gibt einen Strukturwert zurück. Wenn Sie außerhalb einer Pipeline verwendet werden, müssen Sie das Schema des zurückgegebenen Werts explizit mithilfe des schema Arguments angeben. Wenn Sie Lakeflow Spark Declarative Pipelines verwenden, können Sie Schemaschlussfolgerung und Schemaevolution aktivieren, wodurch das Schema des zurückgegebenen Werts automatisch verwaltet wird. Dieses Feature vereinfacht sowohl das anfängliche Setup (insbesondere, wenn das Schema unbekannt ist) als auch laufende Vorgänge, wenn sich das Schema häufig ändert. Es ermöglicht eine nahtlose Verarbeitung beliebiger JSON-Blobs aus Streamingdatenquellen wie Auto Loader, Kafka oder Kinesis.
Insbesondere kann bei Verwendung in einer Pipeline Schema-Inferenz und Evolution für die from_json SQL-Funktion ermöglichen:
- Erkennen neuer Felder in eingehenden JSON-Datensätzen (einschließlich geschachtelter JSON-Objekte)
- Feldtypen ermitteln und diesen entsprechende Spark-Datentypen zuordnen
- Automatisches Entwickeln des Schemas zur Aufnahme neuer Felder
- Automatisches Behandeln von Daten, die dem aktuellen Schema nicht entsprechen
Syntax: Automatisches Ableiten und Entwickeln des Schemas
Um die Schema-Inferenz mit from_json in einer Pipeline zu aktivieren, legen Sie das Schema auf NULL fest und setzen Sie die schemaLocationKey Option. Auf diese Weise können Sie das Schema ableiten und nachverfolgen.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Python
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Eine Abfrage kann mehrere from_json Ausdrücke enthalten, aber jeder Ausdruck muss über einen eindeutigen schemaLocationKeyAusdruck verfügen. Dies schemaLocationKey muss auch pro Pipeline eindeutig sein.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Syntax: Festes Schema
Wenn Sie stattdessen ein bestimmtes Schema erzwingen möchten, können Sie die folgende from_json Syntax verwenden, um die JSON-Zeichenfolge mithilfe dieses Schemas zu analysieren:
from_json(jsonStr, schema, [, options])
Diese Syntax kann in jeder Azure Databricks-Umgebung verwendet werden, einschließlich Lakeflow Spark Declarative Pipelines. Weitere Informationen finden Sie hier.
Schema Inferenz
from_json leitet das Schema aus dem ersten Batch von JSON-Datenspalten ab und indiziert es intern anhand des schemaLocationKey Schemas (erforderlich).
Wenn es sich bei der JSON-Zeichenfolge um ein einzelnes Objekt handelt (z.B. {"id": 123, "name": "John"}), leitet from_json ein Schema vom Typ STRUCT ab und fügt rescuedDataColumn der Liste der Felder hinzu.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Wenn die JSON-Zeichenfolge jedoch ein Array der obersten Ebene, wie ["id": 123, "name": "John"], aufweist, wird from_json das ARRAY in eine STRUKTUR eingebettet. Dieser Ansatz ermöglicht das Retten von Daten, die mit dem abgeleiteten Schema nicht kompatibel sind. Sie haben die Möglichkeit, die Arraywerte nachgelagert in separate Zeilen aufzuteilen.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Außerkraftsetzen der Schemaausleitung mithilfe von Schemahinweisen
Sie können optional schemaHints angeben, um zu beeinflussen, wie from_json den Typ einer Spalte ableitet. Dies ist hilfreich, wenn Sie wissen, dass eine Spalte einen bestimmten Datentyp aufweist oder wenn Sie einen allgemeineren Datentyp auswählen möchten (z. B. ein Double anstelle einer ganzen Zahl). Sie können eine beliebige Anzahl von Hinweisen für Spaltendatentypen mithilfe der SQL-Schemaspezifikationssyntax bereitstellen. Die Semantik für Schemahinweise ist dieselbe wie für die Auto Loader-Schemahinweise. Beispiel:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Wenn die JSON-Zeichenfolge ein ARRAY der obersten Ebene enthält, wird sie in eine STRUKTUR eingeschlossen. In diesen Fällen werden Schemahinweise auf das ARRAY-Schema und nicht auf das umschlossene Struktur-Schema angewendet. Ziehen Sie beispielsweise eine JSON-Zeichenfolge mit einem Array der obersten Ebene in Betracht, z. B.:
[{"id": 123, "name": "John"}]
Das abgeleitete ARRAY-Schema wird in eine STRUKTUR eingeschlossen:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Um den Datentyp von idzu ändern, geben Sie den Schemahinweis als element.id STRING an. Geben Sie element.new_col DOUBLE an, um eine neue Spalte vom Typ DOUBLE hinzuzufügen. Aufgrund dieser Hinweise wird das Schema für das JSON-Array auf oberster Ebene spezifiziert als:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Entwickeln des Schemas mithilfe von schemaEvolutionMode
from_json erkennt das Hinzufügen neuer Spalten, während sie Ihre Daten verarbeitet. Wenn from_json ein neues Feld erkannt wird, aktualisiert es das abgeleitete Schema mit dem neuesten Schema, indem neue Spalten am Ende des Schemas zusammengeführt werden. Die Datentypen vorhandener Spalten bleiben unverändert. Nach der Schemaaktualisierung wird die Pipeline automatisch mit dem aktualisierten Schema neu gestartet.
from_json unterstützt die folgenden Modi für die Schemaentwicklung, die Sie mit der optionalen schemaEvolutionMode Einstellung festlegen. Diese Modi sind mit dem automatischen Laden konsistent.
schemaEvolutionMode |
Verhalten beim Lesen einer neuen Spalte |
|---|---|
addNewColumns (Standardwert) |
Stream schlägt fehl. Dem Schema werden neue Spalten hinzugefügt. Vorhandene Spalten entwickeln keine Datentypen. |
rescue |
Das Schema wird nie weiterentwickelt, und der Datenstrom schlägt aufgrund von Schemaänderungen nicht fehl. Alle neuen Spalten werden in der Spalte "Gerettete Daten" aufgezeichnet. |
failOnNewColumns |
Stream schlägt fehl. Stream wird nicht neu gestartet, es sei denn, die schemaHints werden aktualisiert oder die problematischen Daten werden entfernt. |
none |
Das Schema wird nicht weiterentwickelt, neue Spalten werden ignoriert, und Die Daten werden nur dann wiederhergestellt, wenn die rescuedDataColumn Option festgelegt ist. Stream schlägt aufgrund von Schemaänderungen nicht fehl. |
Beispiel:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Gerettete Datenspalte
Dem Schema wird automatisch eine gerettete Datenspalte als _rescued_data hinzugefügt. Sie können die Spalte umbenennen, indem Sie die rescuedDataColumn Option festlegen. Beispiel:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Wenn Sie die Spalte für gerettete Daten verwenden, werden alle Spalten, die nicht mit dem abgeleiteten Schema übereinstimmen, gerettet, anstatt gelöscht zu werden. Dies kann aufgrund eines Datentypkonflikts, einer fehlenden Spalte im Schema oder aufgrund von Unterschieden in der Groß-/Kleinschreibung bei Spaltennamen auftreten.
Behandeln beschädigter Datensätze
Um Datensätze zu speichern, die falsch formatiert sind und nicht analysiert werden können, fügen Sie eine _corrupt_record Spalte hinzu, indem Sie Schemahinweise festlegen, z. B. im folgenden Beispiel:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Legen Sie die columnNameOfCorruptRecord Option fest, um die Beschädigte Datensatzspalte umzubenennen.
Der JSON-Parser unterstützt drei Modi für die Behandlung beschädigter Datensätze:
| Modus | Description |
|---|---|
PERMISSIVE |
Bei beschädigten Datensätzen wird die falsch formatierte Zeichenfolge in ein Feld eingefügt, das von columnNameOfCorruptRecord konfiguriert ist, und die falsch formatierten Felder werden auf null gesetzt. Um beschädigte Datensätze beizubehalten, können Sie ein Zeichenfolgentypfeld festlegen, das in einem benutzerdefinierten Schema benannt ist columnNameOfCorruptRecord . Wenn ein Schema nicht über das Feld verfügt, werden beschädigte Datensätze beim Parsen gelöscht. Beim Ableiten eines Schemas fügt der Parser implizit ein columnNameOfCorruptRecord Feld im Ausgabeschema hinzu. |
DROPMALFORMED |
Ignoriert beschädigte Datensätze. Wenn Sie den DROPMALFORMED-Modus mit rescuedDataColumn verwenden, führen Datentypkonflikte nicht dazu, dass Datensätze ausgeschlossen werden. Es werden nur beschädigte Datensätze gelöscht, z. B. unvollständige oder falsch formatierte JSON-Daten. |
FAILFAST |
Löst eine Ausnahme aus, wenn der Parser beschädigte Datensätze antrifft. Wenn Sie FAILFAST-Modus zusammen mit rescuedDataColumn verwenden, werfen Datentypkonflikte keinen Fehler. Nur beschädigte Datensätze lösen Fehler aus, z. B. unvollständige oder falsch formatierte JSON.Only corrupt records throw errors, such as unvollständig or malformed JSON. |
Verweisen auf ein Feld in der ausgabe from_json
from_json leitet das Schema während der Pipelineausführung ab. Wenn eine nachgeschaltete Abfrage auf ein from_json Feld verweist, bevor die from_json Funktion mindestens ein Mal erfolgreich ausgeführt wurde, wird das Feld nicht aufgelöst und die Abfrage wird übersprungen. Im folgenden Beispiel wird die Analyse für die Silbertabellenabfrage übersprungen, bis die from_json Funktion in der Bronzeabfrage ausgeführt und das Schema abgeleitet wurde.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Wenn die from_json Funktion und die Felder, auf die die Funktion folgt, in derselben Abfrage referenziert werden, schlägt die Analyse möglicherweise wie im folgenden Beispiel fehl:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Sie können dies beheben, indem Sie den Verweis auf das from_json Feld in eine nachgeschaltete Abfrage verschieben (z. B. das Bronze/Silber-Beispiel oben).) Alternativ können Sie angeben schemaHints , dass die verwiesenen from_json Felder enthalten. Beispiel:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Beispiele: Automatisches Ableiten und Entwickeln des Schemas
Dieser Abschnitt enthält Beispielcode zum Aktivieren der automatischen Schemaableitung und -anpassung mithilfe von Lakeflow Spark Declarative Pipelines mit from_json.
Erstellen einer Streamingtabelle aus dem Cloudobjektspeicher
Im folgenden Beispiel wird die Syntax read_files verwendet, um eine Streamingtabelle aus dem Cloudobjektspeicher zu erstellen.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Erstellen einer Streamingtabelle aus Kafka
Im folgenden Beispiel wird die Syntax read_kafka verwendet, um eine Streaming-Tabelle aus Kafka zu erstellen.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Python
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Beispiele: Festes Schema
Beispielcode mit from_json einem festen Schema finden Sie unter from_json "Funktion".
Häufig gestellte Fragen
In diesem Abschnitt werden häufig gestellte Fragen zu Schemainference und Evolutionsunterstützung in der from_json Funktion beantwortet.
Was ist der Unterschied zwischen from_json und parse_json?
Die parse_json Funktion gibt einen VARIANT Wert aus der JSON-Zeichenfolge zurück.
VARIANT bietet eine flexible und effiziente Möglichkeit zum Speichern halbstrukturierter Daten. Dadurch werden Schemaableitungen und -evolution umgangen, indem strikte Typen vollständig abgeschafft werden. Wenn Sie jedoch ein Schema zur Schreibzeit erzwingen möchten (z. B. weil Sie über ein relativ strenges Schema verfügen), from_json ist dies möglicherweise eine bessere Option.
In der folgenden Tabelle werden die Unterschiede zwischen from_json und parse_json beschrieben:
| Funktion | Anwendungsfälle | Verfügbarkeit |
|---|---|---|
from_json |
Die Schemaentwicklung mit from_json beibehält das Schema. Dies ist hilfreich, wenn:
|
Verfügbar mit Schemavermutung und Evolution nur in Lakeflow Spark Deklarative Pipelines |
parse_json |
VARIANT eignet sich besonders gut zum Halten von Daten, die nicht schematisiert werden müssen. Beispiel:
|
Verfügbar mit und ohne Lakeflow Spark Declarative Pipelines |
Kann ich from_json Schema-Inferenz- und Evolutionssyntax außerhalb von Lakeflow Spark Declarative Pipelines verwenden?
Nein, Sie können die Schema-Inferenz- und Evolution-Syntax nicht außerhalb von from_json Lakeflow Spark Declarative Pipelines verwenden.
Wie kann ich auf das von from_json abgeleitete Schema zugreifen?
Zeigen Sie das Schema der Zielstreamingtabelle an.
Kann ich ein Schema übergeben from_json und auch die Entwicklung durchführen?
Nein, Sie können kein Schema übergeben from_json und auch die Weiterentwicklung durchführen. Sie können jedoch Schemahinweise bereitstellen, um einige oder alle Felder zu überschreiben, die von from_json abgeleitet werden.
Was geschieht mit dem Schema, wenn die Tabelle vollständig aktualisiert wird?
Die der Tabelle zugeordneten Schemaspeicherorte werden gelöscht, und das Schema wird von Grund auf neu abgeleitet.