Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Odvozujte a rozvíjejte schéma pomocí
Důležité
Tato funkce je ve verzi Public Preview.
Tento článek popisuje, jak odvodit a vyvíjet schéma JSON blobů pomocí funkce SQL from_json v deklarativních pipeline Lakeflow Spark.
Přehled
Funkce from_json SQL parsuje sloupec řetězce JSON a vrátí hodnotu struktury. Při použití mimo kanál je nutné explicitně zadat schéma vrácené hodnoty pomocí argumentu schema . Při použití s deklarativními kanály Lakeflow Spark můžete povolit inferenci a evoluci schématu, které automaticky spravují schéma vracené hodnoty. Tato funkce zjednodušuje počáteční nastavení (zejména v případě, že schéma není známo) a probíhající operace, když se schéma často mění. Umožňuje hladké zpracování libovolných objektů blob JSON ze streamovaných zdrojů dat, jako jsou Auto Loader, Kafka nebo Kinesis.
Konkrétně platí, že při použití v kanálu může odvozování a vývoj schématu pro from_json funkci SQL být následující.
- Detekce nových polí v příchozích záznamech JSON (včetně vnořených objektů JSON)
- Proveďte odvození typů polí a jejich mapování na odpovídající datové typy Sparku
- Automaticky vyvíjet schéma tak, aby vyhovovalo novým polím
- Automatické zpracování dat, která neodpovídají aktuálnímu schématu
Syntaxe: Automatické odvození a vývoj schématu
Pokud chcete povolit odvozování schématu s from_json v kanálu, nastavte schéma na hodnotu NULL a zadejte možnost schemaLocationKey. To umožňuje odvodit schéma a sledovat ho.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Python
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Dotaz může mít více from_json výrazů, ale každý výraz musí mít jedinečný schemaLocationKey. Musí schemaLocationKey být také jedinečný pro každý kanál.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Syntaxe: Pevné schéma
Pokud chcete místo toho vynutit určité schéma, můžete pomocí následující from_json syntaxe parsovat řetězec JSON pomocí tohoto schématu:
from_json(jsonStr, schema, [, options])
Tuto syntaxi je možné použít v jakémkoli prostředí Azure Databricks, včetně deklarativních kanálů Sparku Lakeflow. Další informace najdete zde.
Odvození schématu
from_json odvodí schéma z první dávky datových sloupců JSON a interně ho indexuje podle jeho schemaLocationKey (vyžadováno).
Pokud je řetězec JSON jedním objektem (například {"id": 123, "name": "John"}), from_json odvodí schéma typu STRUCT a přidá ho rescuedDataColumn do seznamu polí.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Pokud má ale řetězec JSON pole nejvyšší úrovně (například ["id": 123, "name": "John"]), from_json zabalí pole ARRAY do struktury. Tento přístup umožňuje záchranu dat, která nejsou kompatibilní s odvozeným schématem. Máte možnost rozložit hodnoty pole do samostatných řádků dále.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Přepsání odvození schématu pomocí náznaků schématu
Volitelně můžete poskytnout schemaHints, aby ovlivnil způsob, jakým from_json odvozuje typ sloupce. To je užitečné, když víte, že sloupec má konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například double místo integeru). Pomocí syntaxe specifikace schématu SQL můžete zadat libovolný počet tipů pro datové typy sloupců. Sémantika nápověd schématu pro Auto Loader je stejná jako u nápověd schématu. Například:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Pokud řetězec JSON obsahuje pole nejvyšší úrovně, je zabaleno do struktury. V těchto případech se na schéma ARRAY místo na zabalenou strukturu použijí indikace schématu. Představte si například řetězec JSON s polem nejvyšší úrovně, například:
[{"id": 123, "name": "John"}]
Odvozené schéma ARRAY je zabaleno do struktury:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Chcete-li změnit datový typ id, zadejte nápovědu schématu jako element.id STRING. Pokud chcete přidat nový sloupec typu DOUBLE, zadejte element.new_col DOUBLE. Kvůli těmto tipům se schéma pole JSON nejvyšší úrovně stane:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Vývoj schématu pomocí schemaEvolutionMode
from_json rozpozná přidání nových sloupců při zpracování dat. Když from_json zjistí nové pole, aktualizuje odvozené schéma pomocí nejnovějšího schématu sloučením nových sloupců na konec schématu. Datové typy existujících sloupců zůstávají beze změny. Po aktualizaci schématu se kanál automaticky restartuje s aktualizovaným schématem.
from_json podporuje následující režimy pro vývoj schématu, které nastavíte pomocí volitelného schemaEvolutionMode nastavení. Tyto režimy jsou v souladu s Auto Loader.
schemaEvolutionMode |
Chování při čtení nového sloupce |
|---|---|
addNewColumns (výchozí) |
Stream selže. Do schématu se přidají nové sloupce. Existující sloupce nemění datové typy. |
rescue |
Schéma se nikdy nemění a datový proud neselže kvůli změnám schématu. Všechny nové sloupce jsou zaznamenány ve sloupci zachráněných dat. |
failOnNewColumns |
Stream selže. Stream se nerestartuje, pokud se schemaHints neaktualizují nebo se neodstraní problematická data. |
none |
Nevyvíjí schéma, nové sloupce se ignorují a data se nezachovají, pokud není nastavená rescuedDataColumn možnost. Stream neselže kvůli změnám schématu. |
Například:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Sloupec zachráněných dat
Do schématu se automaticky přidá záchranný datový sloupec jako _rescued_data. Sloupec můžete přejmenovat nastavením rescuedDataColumn možnosti. Například:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Když se rozhodnete použít záchranný datový sloupec, všechny sloupce, které neodpovídají odvozovanému schématu, se místo vyřazení zachovají. K tomu může dojít kvůli neshodě datového typu, chybějícímu sloupci ve schématu nebo rozdílu v názvech sloupců.
Řešení poškozených záznamů
Pokud chcete uložit záznamy, které jsou poškozené a nelze je analyzovat, přidejte _corrupt_record sloupec nastavením nápovědy schématu, například v následujícím příkladu:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Pokud chcete přejmenovat sloupec poškozených záznamů, nastavte columnNameOfCorruptRecord možnost.
Analyzátor JSON podporuje tři režimy zpracování poškozených záznamů:
| Mode | Description |
|---|---|
PERMISSIVE |
U poškozených záznamů umístí poškozený řetězec do pole nakonfigurovaného pomocí columnNameOfCorruptRecord a nastaví chybně formátovaná pole na null. Chcete-li zachovat poškozené záznamy, můžete nastavit pole typu řetězce pojmenované columnNameOfCorruptRecord v uživatelsky definovaném schématu. Pokud schéma pole neobsahuje, během analýzy se zahodí poškozené záznamy. Při odvození schématu analyzátor implicitně přidá columnNameOfCorruptRecord pole ve výstupním schématu. |
DROPMALFORMED |
Ignoruje poškozené záznamy. Při použití režimu DROPMALFORMED s rescuedDataColumn nedochází k tomu, že by nesoulady v datových typech způsobovaly vyřazení záznamů. Zahodí se jenom poškozené záznamy, například neúplné nebo poškozené JSON. |
FAILFAST |
Vyvolá výjimku, když parser narazí na poškozené záznamy. Pokud používáte FAILFAST režim s datovým typem rescuedDataColumn, neshody datových typů nevyvolají chybu. Pouze poškozené záznamy můžou vyvolat chyby, jako je neúplný nebo poškozený json. |
Odkaz na pole ve výstupu from_json
from_json odvodí schéma během provádění pipeline. Pokud podřízený dotaz odkazuje na from_json pole před úspěšným spuštěním funkce from_json alespoň jednou, pole se nevyhodnotí a dotaz se přeskočí. V následujícím příkladu se analýza pro dotaz na tabulku silver přeskočí, dokud se funkce v dotazu na bronzovou tabulku nespustí a neodvodí schéma.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Pokud se from_json funkce a pole, která odvodí, odkazují ve stejném dotazu, může analýza selhat jako v následujícím příkladu:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Tento problém můžete vyřešit přesunutím odkazu na from_json pole do podřízeného dotazu (například jako výše uvedený příklad bronz/stříbro). Alternativně můžete specifikovat schemaHints, která obsahují referenční from_json pole. Například:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Příklady: Automatické odvození a vývoj schématu
Tato část obsahuje ukázkový kód pro povolení automatického odvozování schématu a vývoje pomocí from_json deklarativních kanálů Sparku v Lakeflow.
Vytvoření streamované tabulky z cloudového úložiště objektů
Následující příklad používá read_files syntaxi k vytvoření streamované tabulky z cloudového úložiště objektů.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Vytvoření streamované tabulky ze systému Kafka
Následující příklad používá read_kafka syntaxi k vytvoření streamované tabulky ze systému Kafka.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Python
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Příklady: Pevné schéma
Příklad kódu, který používá from_json s pevným schématem, viz from_json funkce.
FAQs
Tato část odpovídá na nejčastější dotazy týkající se odvození schématu a podpory vývoje funkce from_json .
Jaký je rozdíl mezi from_json a parse_json?
Funkce parse_json vrátí VARIANT hodnotu z řetězce JSON.
VARIANT nabízí flexibilní a efektivní způsob ukládání částečně strukturovaných dat. Tím se obchází odvození schématu a vývoj tím, že úplně vyhodí striktní typy. Pokud ale chcete vynutit schéma v době zápisu (například vzhledem k tomu, že máte relativně striktní schéma), from_json může být lepší volbou.
Následující tabulka popisuje rozdíly mezi from_json a parse_json:
| Funkce | Případy použití | Availability |
|---|---|---|
from_json |
Vývoj schématu s from_json zachovává schéma. To je užitečné v těchto případech:
|
K dispozici s odvozováním schématu a vývojem pouze v deklarativních kanálech Lakeflow Spark |
parse_json |
VARIANT je obzvláště vhodná pro uchovávání dat, která nemusí být schématizována. Například:
|
K dispozici s deklarativními kanály Lakeflow Spark a bez nich. |
Mohu použít from_json odvozování a vývoj schématu mimo deklarativní kanály Lakeflow Spark?
Ne, nemůžete použít from_json syntaxi pro odvozování schématu a vývoj mimo deklarativní kanály Lakeflow Spark.
Jak získám přístup ke schématu odvozeným from_json?
Zobrazení schématu cílové tabulky streamování
Mohu předat from_json schéma a také provádět jeho evoluci?
Ne, nemůžete předat from_json schéma a také provádět evoluci. Můžete však poskytnout návrhy schématu k přebití některých nebo všech polí odvozených pomocí from_json.
Co se stane se schématem, pokud je tabulka plně aktualizována?
Umístění schématu přidružená k tabulce jsou vymazána a schéma se znovu odvozuje od základu.