Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Wnioskowanie i rozwijanie schematu przy użyciu
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
W tym artykule opisano, jak za pomocą funkcji SQL w potokach deklaratywnych Spark platformy Lakeflow można wnioskować i rozwijać schematy dla blobów JSON.
Przegląd
Funkcja from_json SQL analizuje kolumnę ciągu JSON i zwraca wartość struktury. W przypadku użycia poza potokiem należy jawnie podać schemat zwracanej wartości przy użyciu argumentu schema . W przypadku korzystania z deklaratywnych potoków Lakeflow Spark można włączyć wnioskowanie i ewolucję schematu, który automatycznie zarządza schematem wyniku. Ta funkcja upraszcza zarówno konfigurację początkową (zwłaszcza gdy schemat jest nieznany) i ciągłe operacje, gdy schemat często się zmienia. Umożliwia bezproblemowe przetwarzanie dowolnych obiektów blob JSON ze źródeł danych przesyłanych strumieniowo, takich jak Auto Loader, Kafka lub Kinesis.
W szczególności w przypadku użycia w potoku wnioskowanie schematu i ewolucja funkcji from_json SQL mogą:
- Wykrywanie nowych pól w przychodzących rekordach JSON (w tym zagnieżdżonych obiektów JSON)
- Rozpoznanie typów pól i ich przyporządkowanie odpowiednim typom danych platformy Spark
- Automatyczne rozwijanie schematu w celu uwzględnienia nowych pól
- Automatyczne obsługa danych, które nie są zgodne z bieżącym schematem
Składnia: Automatyczne wnioskowanie i rozwijanie schematu
Aby włączyć wnioskowanie schematu z from_json w potoku, ustaw dla schematu wartość NULL i określ opcję schemaLocationKey. Umożliwia to wnioskowanie i śledzenie schematu.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Python
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Zapytanie może zawierać wiele from_json wyrażeń, ale każde wyrażenie musi mieć unikatową wartość schemaLocationKey. Element schemaLocationKey musi być również unikatowy dla każdego przepływu.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Składnia: Stały schemat
Jeśli zamiast tego chcesz wymusić określony schemat, możesz użyć następującej from_json składni, aby przeanalizować ciąg JSON przy użyciu tego schematu:
from_json(jsonStr, schema, [, options])
Ta składnia może być używana w dowolnym środowisku usługi Azure Databricks, w tym w deklaratywnych potokach Lakeflow Spark. Więcej informacji jest dostępnych tutaj.
Wnioskowanie schematu
from_json Wywnioskuje schemat z pierwszej partii kolumn danych JSON i wewnętrznie indeksuje go według schemaLocationKey (wymagane).
Jeśli ciąg JSON jest pojedynczym obiektem (na przykład {"id": 123, "name": "John"}), from_json wywnioskuje schemat typu STRUKTURA i dodaje element rescuedDataColumn do listy pól.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Jednak jeśli ciąg JSON ma tablicę najwyższego poziomu (na przykład ["id": 123, "name": "John"]), to from_json zawija tablicę w strukturze. Takie podejście umożliwia ratowanie danych niezgodnych ze schematem wywnioskowanym. Istnieje możliwość rozdzielenia wartości tablicy na osobne wiersze dalej.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Nadpisywanie wnioskowania schematu przy użyciu wskazówek dotyczących schematu
Opcjonalnie możesz podać schemaHints , aby wpłynąć na sposób from_json wnioskowania typu kolumny. Jest to przydatne, gdy wiesz, że kolumna ma określony typ danych lub jeśli chcesz wybrać bardziej ogólny typ danych (na przykład podwójne zamiast liczby całkowitej). Możesz podać dowolną liczbę wskazówek dla typów danych kolumn przy użyciu składni specyfikacji schematu SQL. Semantyka wskazówek schematu jest taka sama jak dla wskazówek schematu Auto Loader. Przykład:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Gdy ciąg JSON zawiera tablicę najwyższego poziomu, jest owinięty w strukturę. W takich przypadkach wskazówki schematu są stosowane do schematu ARRAY zamiast opakowanej struktury. Rozważmy na przykład ciąg JSON z tablicą najwyższego poziomu, taką jak:
[{"id": 123, "name": "John"}]
Wywnioskowany schemat TABLICY jest opakowany w STRUKTURĘ:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Aby zmienić typ danych id, określ wskazówkę schematu jako element.id STRING. Aby dodać nową kolumnę typu DOUBLE, określ wartość element.new_col DOUBLE. Ze względu na te wskazówki schemat tablicy JSON najwyższego poziomu staje się:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Rozwijanie schematu przy użyciu polecenia schemaEvolutionMode
from_json wykrywa dodanie nowych kolumn podczas przetwarzania danych. Po from_json wykryciu nowego pola aktualizuje wywnioskowany schemat przy użyciu najnowszego schematu, scalając nowe kolumny na końcu schematu. Typy danych istniejących kolumn pozostają niezmienione. Po aktualizacji schematu potok zostanie automatycznie uruchomiony ponownie przy użyciu zaktualizowanego schematu.
from_json program obsługuje następujące tryby ewolucji schematu, które można ustawić przy użyciu opcjonalnego schemaEvolutionMode ustawienia. Te tryby są zgodne z modułem automatycznego ładowania.
schemaEvolutionMode |
Zachowanie podczas odczytywania nowej kolumny |
|---|---|
addNewColumns (ustawienie domyślne) |
Stream nie działa. Nowe kolumny są dodawane do schematu. Istniejące kolumny nie ewoluują typów danych. |
rescue |
Schemat nigdy nie ewoluował, a strumień nie kończy się niepowodzeniem z powodu zmian schematu. Wszystkie nowe kolumny są rejestrowane w uratowanej kolumnie danych. |
failOnNewColumns |
Stream nie działa. Usługa Stream nie zostanie uruchomiona ponownie, chyba że dane schemaHints zostaną zaktualizowane lub niepoprawne dane zostaną usunięte. |
none |
Nie rozwija schematu, nowe kolumny są ignorowane, a dane nie są ratowane, chyba że opcja jest ustawiona rescuedDataColumn . Usługa Stream nie kończy się niepowodzeniem z powodu zmian schematu. |
Przykład:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Uratowana kolumna danych
Uratowana kolumna danych jest automatycznie dodawana do schematu jako _rescued_data. Możesz zmienić nazwę kolumny, ustawiając rescuedDataColumn opcję . Przykład:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Jeśli zdecydujesz się użyć uratowanej kolumny danych, wszystkie kolumny, które nie pasują do wywnioskowanego schematu, zostaną uratowane zamiast porzucone. Może się to zdarzyć z powodu niezgodności typu danych, brakującej kolumny w schemacie lub różnicy wielkości liter nazwy kolumny.
Obsługa uszkodzonych rekordów
Aby przechowywać rekordy, które są źle sformułowane i nie można ich przeanalizować, dodaj kolumnę _corrupt_record , ustawiając wskazówki schematu, jak w poniższym przykładzie:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Aby zmienić nazwę uszkodzonej kolumny rekordu, ustaw columnNameOfCorruptRecord opcję .
Analizator JSON obsługuje trzy tryby obsługi uszkodzonych rekordów:
| Mode | Description |
|---|---|
PERMISSIVE |
W przypadku uszkodzonych rekordów wstaw źle sformułowany ciąg do pola skonfigurowanego przez columnNameOfCorruptRecord i ustawia źle sformułowane pola na null. Aby zachować uszkodzone rekordy, można ustawić pole typu ciągu o nazwie columnNameOfCorruptRecord w schemacie zdefiniowanym przez użytkownika. Jeśli schemat nie ma pola, uszkodzone rekordy są porzucane podczas analizowania. Podczas wnioskowania schematu analizator niejawnie dodaje columnNameOfCorruptRecord pole w schemacie wyjściowym. |
DROPMALFORMED |
Ignoruje uszkodzone rekordy. W przypadku korzystania z trybu DROPMALFORMED i rescuedDataColumn niezgodność typów danych nie powoduje porzucania rekordów. Porzucane są tylko uszkodzone rekordy, takie jak niekompletne lub źle sformułowane dane JSON. |
FAILFAST |
Zgłasza wyjątek, gdy analizator napotyka uszkodzone rekordy. W przypadku korzystania z trybu FAILFAST wraz z funkcją rescuedDataColumn niezgodność typów danych nie zgłasza błędu. Tylko uszkodzone rekordy zgłaszają błędy, takie jak niekompletne lub źle sformułowane dane JSON. |
Odwołaj się do pola w danych wyjściowych from_json
from_json ustala schemat podczas wykonywania potoku danych. Jeśli zapytanie podrzędne odwołuje się do pola przed tym, jak funkcja from_json zostanie wykonana przynajmniej raz, pole nie zostanie rozpoznane, a zapytanie zostanie pominięte. W poniższym przykładzie analiza zapytania tabeli srebrnej zostanie pominięta, dopóki funkcja from_json w zapytaniu brązowym nie zostanie wykonana i nie wywnioskuje schematu.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
from_json Jeśli funkcja i pola, które wywnioskują, są określane w tym samym zapytaniu, analiza może zakończyć się niepowodzeniem, jak w poniższym przykładzie:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Można to naprawić, przenosząc odwołanie do from_json pola do zapytania na dalszym etapie (jak w przykładzie z brązem/srebrem powyżej). Alternatywnie, można określić schemaHints, które zawierają pole from_json do którego się odnosi. Przykład:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Przykłady: Automatyczne wnioskowanie i rozwijanie schematu
Ta sekcja zawiera przykładowy kod umożliwiający automatyczne wnioskowanie schematu i ewolucję przy użyciu from_json potoków deklaratywnych spark w usłudze Lakeflow.
Tworzenie tabeli strumieniowej z magazynu obiektów w chmurze
W poniższym przykładzie użyto read_files składni do utworzenia tabeli przesyłania strumieniowego z magazynu obiektów w chmurze.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Tworzenie tabeli przesyłania strumieniowego na platformie Kafka
W poniższym przykładzie użyto składni read_kafka do utworzenia tabeli strumieniowania z systemu Kafka.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Python
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Przykłady: stały schemat
Aby zobaczyć przykładowy kod używający from_json ze stałym schematem, zobacz funkcję from_json.
FAQs
Ta sekcja zawiera odpowiedzi na często zadawane pytania dotyczące wnioskowania schematu i obsługi ewolucji w from_json funkcji.
Jaka jest różnica między from_json i parse_json?
Funkcja parse_json zwraca VARIANT wartość z ciągu JSON.
VARIANT zapewnia elastyczny i wydajny sposób przechowywania częściowo ustrukturyzowanych danych. Pozwala to obejść wnioskowanie schematu i ewolucję, całkowicie pomijając ścisłe typy. Jeśli jednak chcesz wymusić schemat w czasie zapisu (na przykład ze względu na stosunkowo rygorystyczny schemat), from_json może być lepszym rozwiązaniem.
W poniższej tabeli opisano różnice między elementami from_json i parse_json:
| Funkcja | Przypadki użycia | Availability |
|---|---|---|
from_json |
Ewolucja schematu z from_json utrzymuje schemat. Jest to przydatne, gdy:
|
Dostępne tylko w deklaratywnych potokach Lakeflow Spark z wnioskowaniem i ewolucją schematu |
parse_json |
OPCJA VARIANT jest szczególnie odpowiednia do przechowywania danych, które nie muszą być schematyzowane. Przykład:
|
Dostępne z potokami deklaratywnymi platformy Spark i bez usługi Lakeflow |
Czy mogę używać składni wnioskowania schematu i ewolucji poza potokami Lakeflow Spark Deklaratywnymi?
Nie, nie można używać składni wnioskowania schematu i ewolucji from_json poza deklaratywnymi potokami Lakeflow Spark.
Jak uzyskać dostęp do schematu wywnioskowany przez from_json?
Wyświetl schemat docelowej tabeli przesyłania strumieniowego.
Czy mogę przekazać from_json schemat, a także przeprowadzić ewolucję?
Nie, nie można przekazać from_json schematu, a także przeprowadzić ewolucję. Można jednak podać wskazówki schematu, aby zastąpić niektóre lub wszystkie pola wywnioskowane przez from_json.
Co się stanie ze schematem, jeśli tabela jest całkowicie odświeżona?
Lokalizacje schematu powiązane z tabelą są resetowane, a schemat jest ponownie ustalany od podstaw.