Dela via


Inferera och utveckla schemat genom att använda from_json i pipelines

Viktigt!

Den här funktionen finns i offentlig förhandsversion.

Den här artikeln beskriver hur du härleder och utvecklar schemat för JSON-blobar med from_json SQL-funktionen i Lakeflow Spark Deklarativa pipelines.

Översikt

from_json SQL-funktionen parsar en JSON-strängkolumn och returnerar ett struct-värde. När du använder utanför en pipeline måste du uttryckligen ange schemat för det returnerade värdet med argumentet schema . När det används med Lakeflow Spark kan du aktivera deklarativa pipelines, schemainferens och utveckling, som automatiskt hanterar schemat för det returnerade värdet. Den här funktionen förenklar både den inledande installationen (särskilt när schemat är okänt) och pågående åtgärder när schemat ändras ofta. Det möjliggör sömlös bearbetning av godtyckliga JSON-blobar från strömmande datakällor som Auto Loader, Kafka eller Kinesis.

När det används i en pipeline kan schemainferens och utveckling för from_json SQL-funktionen:

  • Identifiera nya fält i inkommande JSON-poster (inklusive kapslade JSON-objekt)
  • Härled fälttyperna och mappa dem till lämpliga Spark-datatyper
  • Utveckla schemat automatiskt för att hantera nya fält
  • Hantera data som inte överensstämmer med det aktuella schemat automatiskt

Syntax: Härled och utveckla schemat automatiskt

Om du vill aktivera schemainferens med from_json i en pipeline anger du schemat till NULL och anger alternativet schemaLocationKey . På så sätt kan den härleda och hålla reda på schemat.

SQL

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))

python

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})

En fråga kan ha flera from_json uttryck, men varje uttryck måste ha en unik schemaLocationKey. Det schemaLocationKey måste också vara unik för varje pipeline.

SQL

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

python

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

Syntax: Fixerat schema

Om du vill tillämpa ett visst schema i stället kan du använda följande from_json syntax för att parsa JSON-strängen med hjälp av det schemat:

from_json(jsonStr, schema, [, options])

Den här syntaxen kan användas i valfri Azure Databricks-miljö, inklusive Lakeflow Spark Deklarativa Pipelines. Mer information finns här.

Schemainferens

from_json härleder schemat från den första batchen med JSON-datakolumner och indexerar det internt efter dess schemaLocationKey (krävs).

Om JSON-strängen är ett enskilt objekt (till exempel {"id": 123, "name": "John"}), from_json härleds ett schema av typen STRUCT och lägger till ett rescuedDataColumn i listan med fält.

STRUCT<id LONG, name STRING, _rescued_data STRING>

Men om JSON-strängen har en matris på den översta nivån (till exempel ["id": 123, "name": "John"]) omsluter matrisen from_json i en STRUCT. Den här metoden möjliggör undsättning av data som inte är kompatibla med det härledda schemat. Du kan välja att spränga matrisvärdena i separata rader nedströms.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Åsidosätta schemainferens med hjälp av schematips

Du kan också ange schemaHints för att påverka hur from_json härleder typen av en kolumn. Detta är användbart när du vet att en kolumn är av en viss datatyp, eller om du vill välja en mer allmän datatyp (till exempel en dubbel i stället för ett heltal). Du kan ange ett godtyckligt antal tips för kolumndatatyper med sql-schemaspecifikationssyntax. Semantiken för schematips är samma som för schematipsen för automatisk inläsning. Till exempel:

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

När JSON-strängen innehåller en matris på översta nivån omsluts den i en STRUCT. I dessa fall tillämpas schematips på ARRAY-schemat istället för den omslutna STRUCT-typen. Du kan till exempel överväga en JSON-sträng med en matris på den översta nivån, till exempel:

[{"id": 123, "name": "John"}]

Det avledda ARRAY-schemat är insvept i en STRUCT:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Om du vill ändra datatypen för idanger du schematipset som element.id STRING. Om du vill lägga till en ny kolumn av typen DOUBLE anger du element.new_col DOUBLE. På grund av dessa tips blir schemat för JSON-matrisen på den översta nivån:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Utveckla schemat med hjälp av schemaEvolutionMode

from_json identifierar tillägg av nya kolumner när de bearbetar dina data. När from_json ett nytt fält identifieras uppdateras det härledda schemat med det senaste schemat genom att sammanfoga nya kolumner till slutet av schemat. Datatyperna för befintliga kolumner förblir oförändrade. Efter schemauppdateringen startas pipelinen om automatiskt med det uppdaterade schemat.

from_json stöder följande lägen för schemautveckling, som du anger med hjälp av den valfria schemaEvolutionMode inställningen. Dessa lägen är konsekventa med Auto Loader.

schemaEvolutionMode Beteende vid läsning av en ny kolumn
addNewColumns (standardinställning) Streamen har avbrutits. Nya kolumner läggs till i schemat. Befintliga kolumner utvecklar inte datatyper.
rescue Schemat har aldrig utvecklats och strömmen misslyckas inte på grund av schemaändringar. Alla nya kolumner registreras i den räddade datakolumnen.
failOnNewColumns Streamen har avbrutits. Stream startar inte om såvida inte schemaHints de uppdateras eller de felaktiga data tas bort.
none Utvecklar inte schemat, nya kolumner ignoreras och data räddas inte om inte rescuedDataColumn alternativet har angetts. Stream misslyckas inte på grund av schemaändringar.

Till exempel:

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

Räddad datakolumn

En räddad datakolumn läggs automatiskt till i schemat som _rescued_data. Du kan byta namn på kolumnen genom att ange alternativet rescuedDataColumn . Till exempel:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

När du väljer att använda den räddade datakolumnen räddas alla kolumner som inte matchar det härledda schemat i stället för att tas bort. Detta kan inträffa på grund av ett matchningsfel av datatyp, en kolumn som saknas i schemat eller en skillnad i kolumnnamnets hölje.

Hantera korrupta poster

Om du vill lagra poster som är felaktiga och inte kan parsas lägger du till en _corrupt_record kolumn genom att ange schematips, som i följande exempel:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Om du vill byta namn på den skadade postkolumnen anger du alternativet columnNameOfCorruptRecord .

JSON-parsern stöder tre lägen för hantering av skadade poster:

Mode Description
PERMISSIVE För skadade poster placeras den felaktiga strängen i ett fält konfigurerat av columnNameOfCorruptRecord och felaktiga fält ställs in på null. Om du vill behålla skadade poster kan du ange ett strängtypfält med namnet columnNameOfCorruptRecord i ett användardefinierat schema. Om ett schema inte har fältet tas skadade poster bort under parsningen. När du härleder ett schema lägger parsern implicit till ett columnNameOfCorruptRecord fält i utdataschemat.
DROPMALFORMED Ignorerar skadade poster.
När du använder DROPMALFORMED i läget för rescuedDataColumn, medför inte datatypsinkompatibiliteter att poster tas bort. Endast skadade poster tas bort, till exempel ofullständig eller felaktig JSON.
FAILFAST Utlöser ett undantag när parsern möter korrupta poster.
När du använder FAILFAST läget med rescuedDataColumngenererar inte datatypsmatchningar något fel. Endast korrupta poster utlöser fel, till exempel ofullständig eller felaktig JSON.

Referera till ett fält i from_json-utdata

from_json härleder schemat under pipelinekörningen. Om en nedströmsfråga refererar till ett from_json fält innan from_json funktionen har körts minst en gång, löses inte fältet och frågan ignoreras. I följande exempel hoppas analysen för silvertabellfrågan över tills from_json funktionen i bronsfrågan har körts och härledt schemat.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

Om funktionen from_json och fälten som den härleder refereras till i samma fråga kan analysen misslyckas som i följande exempel:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Du kan åtgärda detta genom att flytta referensen till fältet from_json till en nedströmsfråga (som exemplet brons/silver ovan.) Du kan också ange schemaHints som innehåller de refererade from_json fälten. Till exempel:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Exempel: Härled och utveckla schemat automatiskt

Det här avsnittet innehåller exempelkod för att aktivera automatisk schemainferens och utveckling med hjälp av from_json i Lakeflow Spark Deklarativa pipelines.

Skapa en strömmande tabell från molnobjektlagring

I följande exempel används read_files syntax för att skapa en strömmande tabell från molnobjektlagring.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

python

@dp.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

Skapa en strömmande tabell från Kafka

I följande exempel används read_kafka syntax för att skapa en strömningstabell från Kafka.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

python

@dp.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

Exempel: Fast schema

Exempel på kod som använder from_json med ett fast schema finns i from_json funktion.

FAQs

Det här avsnittet besvarar vanliga frågor om schemainferens och utvecklingsstöd i from_json funktionen.

Vad är skillnaden mellan from_json och parse_json?

Funktionen parse_json returnerar ett VARIANT värde från JSON-strängen.

VARIANT är ett flexibelt och effektivt sätt att lagra halvstrukturerade data. Detta kringgår schemainferens och utveckling genom att avskaffa strikta typer helt och hållet. Men om du vill framtvinga ett schema vid skrivtillfället (till exempel för att du har ett relativt strikt schema) from_json kan det vara ett bättre alternativ.

I följande tabell beskrivs skillnaderna mellan from_json och parse_json:

Funktion Användningsfall Availability
from_json Schemautveckling med from_json underhåller schemat. Detta är användbart när:
  • Du vill framtvinga ditt dataschema (till exempel granska varje schemaändring innan du sparar det).
  • Du vill optimera lagringen och kräva låg frågesvarstid och kostnad.
  • Du vill misslyckas med data med felmatchade typer.
  • Du vill extrahera partiella resultat från skadade JSON-poster och lagra den felaktiga posten i _corrupt_record kolumnen. Variantinmatning returnerar däremot ett fel för ogiltig JSON.
Tillgänglig med schemainferens och evolution endast i Lakeflow Spark deklarativa pipelines
parse_json VARIANT passar särskilt bra för att lagra data som inte behöver schematiseras. Till exempel:
  • Du vill behålla data halvstrukturerade eftersom de är flexibla.
  • Schemat ändras för snabbt för att omvandla det till ett schema utan frekventa strömfel och omstarter.
  • Du vill inte misslyckas med data med felmatchade typer. (VARIANT-inmatning lyckas alltid för giltiga JSON-poster – även om det finns typmismatcher.)
  • Användarna vill inte hantera den räddade datakolumnen som innehåller fält som inte överensstämmer med schemat.
Tillgänglig med och utan Lakeflow Spark deklarativa rörledningar

Kan jag använda from_json schemainferens och evolutionssyntax utanför Lakeflow Spark deklarativa pipelines?

Nej, du kan inte använda from_json schemainferens och evolutionssyntax utanför Lakeflow Spark Deklarativa Pipelines.

Hur kommer jag åt schemat som härleds av from_json?

Visa schemat för målströmningstabellen.

Kan jag skicka from_json ett schema och även utföra utveckling?

Nej, du kan inte skicka from_json ett schema och även utföra utveckling. Du kan dock ange schematips för att åsidosätta vissa eller alla fält som härleds av from_json.

Vad händer med schemat om tabellen är helt uppdaterad?

Schemaplatserna som är associerade med tabellen rensas och schemat härleds från grunden igen.