Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Inferera och utveckla schemat genom att använda
Viktigt!
Den här funktionen finns i offentlig förhandsversion.
Den här artikeln beskriver hur du härleder och utvecklar schemat för JSON-blobar med from_json SQL-funktionen i Lakeflow Spark Deklarativa pipelines.
Översikt
from_json SQL-funktionen parsar en JSON-strängkolumn och returnerar ett struct-värde. När du använder utanför en pipeline måste du uttryckligen ange schemat för det returnerade värdet med argumentet schema . När det används med Lakeflow Spark kan du aktivera deklarativa pipelines, schemainferens och utveckling, som automatiskt hanterar schemat för det returnerade värdet. Den här funktionen förenklar både den inledande installationen (särskilt när schemat är okänt) och pågående åtgärder när schemat ändras ofta. Det möjliggör sömlös bearbetning av godtyckliga JSON-blobar från strömmande datakällor som Auto Loader, Kafka eller Kinesis.
När det används i en pipeline kan schemainferens och utveckling för from_json SQL-funktionen:
- Identifiera nya fält i inkommande JSON-poster (inklusive kapslade JSON-objekt)
- Härled fälttyperna och mappa dem till lämpliga Spark-datatyper
- Utveckla schemat automatiskt för att hantera nya fält
- Hantera data som inte överensstämmer med det aktuella schemat automatiskt
Syntax: Härled och utveckla schemat automatiskt
Om du vill aktivera schemainferens med from_json i en pipeline anger du schemat till NULL och anger alternativet schemaLocationKey . På så sätt kan den härleda och hålla reda på schemat.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
python
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
En fråga kan ha flera from_json uttryck, men varje uttryck måste ha en unik schemaLocationKey. Det schemaLocationKey måste också vara unik för varje pipeline.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Syntax: Fixerat schema
Om du vill tillämpa ett visst schema i stället kan du använda följande from_json syntax för att parsa JSON-strängen med hjälp av det schemat:
from_json(jsonStr, schema, [, options])
Den här syntaxen kan användas i valfri Azure Databricks-miljö, inklusive Lakeflow Spark Deklarativa Pipelines. Mer information finns här.
Schemainferens
from_json härleder schemat från den första batchen med JSON-datakolumner och indexerar det internt efter dess schemaLocationKey (krävs).
Om JSON-strängen är ett enskilt objekt (till exempel {"id": 123, "name": "John"}), from_json härleds ett schema av typen STRUCT och lägger till ett rescuedDataColumn i listan med fält.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Men om JSON-strängen har en matris på den översta nivån (till exempel ["id": 123, "name": "John"]) omsluter matrisen from_json i en STRUCT. Den här metoden möjliggör undsättning av data som inte är kompatibla med det härledda schemat. Du kan välja att spränga matrisvärdena i separata rader nedströms.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Åsidosätta schemainferens med hjälp av schematips
Du kan också ange schemaHints för att påverka hur from_json härleder typen av en kolumn. Detta är användbart när du vet att en kolumn är av en viss datatyp, eller om du vill välja en mer allmän datatyp (till exempel en dubbel i stället för ett heltal). Du kan ange ett godtyckligt antal tips för kolumndatatyper med sql-schemaspecifikationssyntax. Semantiken för schematips är samma som för schematipsen för automatisk inläsning. Till exempel:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
När JSON-strängen innehåller en matris på översta nivån omsluts den i en STRUCT. I dessa fall tillämpas schematips på ARRAY-schemat istället för den omslutna STRUCT-typen. Du kan till exempel överväga en JSON-sträng med en matris på den översta nivån, till exempel:
[{"id": 123, "name": "John"}]
Det avledda ARRAY-schemat är insvept i en STRUCT:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Om du vill ändra datatypen för idanger du schematipset som element.id STRING. Om du vill lägga till en ny kolumn av typen DOUBLE anger du element.new_col DOUBLE. På grund av dessa tips blir schemat för JSON-matrisen på den översta nivån:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Utveckla schemat med hjälp av schemaEvolutionMode
from_json identifierar tillägg av nya kolumner när de bearbetar dina data. När from_json ett nytt fält identifieras uppdateras det härledda schemat med det senaste schemat genom att sammanfoga nya kolumner till slutet av schemat. Datatyperna för befintliga kolumner förblir oförändrade. Efter schemauppdateringen startas pipelinen om automatiskt med det uppdaterade schemat.
from_json stöder följande lägen för schemautveckling, som du anger med hjälp av den valfria schemaEvolutionMode inställningen. Dessa lägen är konsekventa med Auto Loader.
schemaEvolutionMode |
Beteende vid läsning av en ny kolumn |
|---|---|
addNewColumns (standardinställning) |
Streamen har avbrutits. Nya kolumner läggs till i schemat. Befintliga kolumner utvecklar inte datatyper. |
rescue |
Schemat har aldrig utvecklats och strömmen misslyckas inte på grund av schemaändringar. Alla nya kolumner registreras i den räddade datakolumnen. |
failOnNewColumns |
Streamen har avbrutits. Stream startar inte om såvida inte schemaHints de uppdateras eller de felaktiga data tas bort. |
none |
Utvecklar inte schemat, nya kolumner ignoreras och data räddas inte om inte rescuedDataColumn alternativet har angetts. Stream misslyckas inte på grund av schemaändringar. |
Till exempel:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Räddad datakolumn
En räddad datakolumn läggs automatiskt till i schemat som _rescued_data. Du kan byta namn på kolumnen genom att ange alternativet rescuedDataColumn . Till exempel:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
När du väljer att använda den räddade datakolumnen räddas alla kolumner som inte matchar det härledda schemat i stället för att tas bort. Detta kan inträffa på grund av ett matchningsfel av datatyp, en kolumn som saknas i schemat eller en skillnad i kolumnnamnets hölje.
Hantera korrupta poster
Om du vill lagra poster som är felaktiga och inte kan parsas lägger du till en _corrupt_record kolumn genom att ange schematips, som i följande exempel:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Om du vill byta namn på den skadade postkolumnen anger du alternativet columnNameOfCorruptRecord .
JSON-parsern stöder tre lägen för hantering av skadade poster:
| Mode | Description |
|---|---|
PERMISSIVE |
För skadade poster placeras den felaktiga strängen i ett fält konfigurerat av columnNameOfCorruptRecord och felaktiga fält ställs in på null. Om du vill behålla skadade poster kan du ange ett strängtypfält med namnet columnNameOfCorruptRecord i ett användardefinierat schema. Om ett schema inte har fältet tas skadade poster bort under parsningen. När du härleder ett schema lägger parsern implicit till ett columnNameOfCorruptRecord fält i utdataschemat. |
DROPMALFORMED |
Ignorerar skadade poster. När du använder DROPMALFORMED i läget för rescuedDataColumn, medför inte datatypsinkompatibiliteter att poster tas bort. Endast skadade poster tas bort, till exempel ofullständig eller felaktig JSON. |
FAILFAST |
Utlöser ett undantag när parsern möter korrupta poster. När du använder FAILFAST läget med rescuedDataColumngenererar inte datatypsmatchningar något fel. Endast korrupta poster utlöser fel, till exempel ofullständig eller felaktig JSON. |
Referera till ett fält i from_json-utdata
from_json härleder schemat under pipelinekörningen. Om en nedströmsfråga refererar till ett from_json fält innan from_json funktionen har körts minst en gång, löses inte fältet och frågan ignoreras. I följande exempel hoppas analysen för silvertabellfrågan över tills from_json funktionen i bronsfrågan har körts och härledt schemat.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Om funktionen from_json och fälten som den härleder refereras till i samma fråga kan analysen misslyckas som i följande exempel:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Du kan åtgärda detta genom att flytta referensen till fältet from_json till en nedströmsfråga (som exemplet brons/silver ovan.) Du kan också ange schemaHints som innehåller de refererade from_json fälten. Till exempel:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Exempel: Härled och utveckla schemat automatiskt
Det här avsnittet innehåller exempelkod för att aktivera automatisk schemainferens och utveckling med hjälp av from_json i Lakeflow Spark Deklarativa pipelines.
Skapa en strömmande tabell från molnobjektlagring
I följande exempel används read_files syntax för att skapa en strömmande tabell från molnobjektlagring.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
python
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Skapa en strömmande tabell från Kafka
I följande exempel används read_kafka syntax för att skapa en strömningstabell från Kafka.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
python
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Exempel: Fast schema
Exempel på kod som använder from_json med ett fast schema finns i from_json funktion.
FAQs
Det här avsnittet besvarar vanliga frågor om schemainferens och utvecklingsstöd i from_json funktionen.
Vad är skillnaden mellan from_json och parse_json?
Funktionen parse_json returnerar ett VARIANT värde från JSON-strängen.
VARIANT är ett flexibelt och effektivt sätt att lagra halvstrukturerade data. Detta kringgår schemainferens och utveckling genom att avskaffa strikta typer helt och hållet. Men om du vill framtvinga ett schema vid skrivtillfället (till exempel för att du har ett relativt strikt schema) from_json kan det vara ett bättre alternativ.
I följande tabell beskrivs skillnaderna mellan from_json och parse_json:
| Funktion | Användningsfall | Availability |
|---|---|---|
from_json |
Schemautveckling med from_json underhåller schemat. Detta är användbart när:
|
Tillgänglig med schemainferens och evolution endast i Lakeflow Spark deklarativa pipelines |
parse_json |
VARIANT passar särskilt bra för att lagra data som inte behöver schematiseras. Till exempel:
|
Tillgänglig med och utan Lakeflow Spark deklarativa rörledningar |
Kan jag använda from_json schemainferens och evolutionssyntax utanför Lakeflow Spark deklarativa pipelines?
Nej, du kan inte använda from_json schemainferens och evolutionssyntax utanför Lakeflow Spark Deklarativa Pipelines.
Hur kommer jag åt schemat som härleds av from_json?
Visa schemat för målströmningstabellen.
Kan jag skicka from_json ett schema och även utföra utveckling?
Nej, du kan inte skicka from_json ett schema och även utföra utveckling. Du kan dock ange schematips för att åsidosätta vissa eller alla fält som härleds av from_json.
Vad händer med schemat om tabellen är helt uppdaterad?
Schemaplatserna som är associerade med tabellen rensas och schemat härleds från grunden igen.