Udostępnij przez


Wnioskowanie i rozwijanie schematu przy użyciu from_json w potokach

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

W tym artykule opisano, jak za pomocą funkcji SQL w potokach deklaratywnych Spark platformy Lakeflow można wnioskować i rozwijać schematy dla blobów JSON.

Przegląd

Funkcja from_json SQL analizuje kolumnę ciągu JSON i zwraca wartość struktury. W przypadku użycia poza potokiem należy jawnie podać schemat zwracanej wartości przy użyciu argumentu schema . W przypadku korzystania z deklaratywnych potoków Lakeflow Spark można włączyć wnioskowanie i ewolucję schematu, który automatycznie zarządza schematem wyniku. Ta funkcja upraszcza zarówno konfigurację początkową (zwłaszcza gdy schemat jest nieznany) i ciągłe operacje, gdy schemat często się zmienia. Umożliwia bezproblemowe przetwarzanie dowolnych obiektów blob JSON ze źródeł danych przesyłanych strumieniowo, takich jak Auto Loader, Kafka lub Kinesis.

W szczególności w przypadku użycia w potoku wnioskowanie schematu i ewolucja funkcji from_json SQL mogą:

  • Wykrywanie nowych pól w przychodzących rekordach JSON (w tym zagnieżdżonych obiektów JSON)
  • Rozpoznanie typów pól i ich przyporządkowanie odpowiednim typom danych platformy Spark
  • Automatyczne rozwijanie schematu w celu uwzględnienia nowych pól
  • Automatyczne obsługa danych, które nie są zgodne z bieżącym schematem

Składnia: Automatyczne wnioskowanie i rozwijanie schematu

Aby włączyć wnioskowanie schematu z from_json w potoku, ustaw dla schematu wartość NULL i określ opcję schemaLocationKey. Umożliwia to wnioskowanie i śledzenie schematu.

SQL

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))

Python

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})

Zapytanie może zawierać wiele from_json wyrażeń, ale każde wyrażenie musi mieć unikatową wartość schemaLocationKey. Element schemaLocationKey musi być również unikatowy dla każdego przepływu.

SQL

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

Składnia: Stały schemat

Jeśli zamiast tego chcesz wymusić określony schemat, możesz użyć następującej from_json składni, aby przeanalizować ciąg JSON przy użyciu tego schematu:

from_json(jsonStr, schema, [, options])

Ta składnia może być używana w dowolnym środowisku usługi Azure Databricks, w tym w deklaratywnych potokach Lakeflow Spark. Więcej informacji jest dostępnych tutaj.

Wnioskowanie schematu

from_json Wywnioskuje schemat z pierwszej partii kolumn danych JSON i wewnętrznie indeksuje go według schemaLocationKey (wymagane).

Jeśli ciąg JSON jest pojedynczym obiektem (na przykład {"id": 123, "name": "John"}), from_json wywnioskuje schemat typu STRUKTURA i dodaje element rescuedDataColumn do listy pól.

STRUCT<id LONG, name STRING, _rescued_data STRING>

Jednak jeśli ciąg JSON ma tablicę najwyższego poziomu (na przykład ["id": 123, "name": "John"]), to from_json zawija tablicę w strukturze. Takie podejście umożliwia ratowanie danych niezgodnych ze schematem wywnioskowanym. Istnieje możliwość rozdzielenia wartości tablicy na osobne wiersze dalej.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Nadpisywanie wnioskowania schematu przy użyciu wskazówek dotyczących schematu

Opcjonalnie możesz podać schemaHints , aby wpłynąć na sposób from_json wnioskowania typu kolumny. Jest to przydatne, gdy wiesz, że kolumna ma określony typ danych lub jeśli chcesz wybrać bardziej ogólny typ danych (na przykład podwójne zamiast liczby całkowitej). Możesz podać dowolną liczbę wskazówek dla typów danych kolumn przy użyciu składni specyfikacji schematu SQL. Semantyka wskazówek schematu jest taka sama jak dla wskazówek schematu Auto Loader. Przykład:

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

Gdy ciąg JSON zawiera tablicę najwyższego poziomu, jest owinięty w strukturę. W takich przypadkach wskazówki schematu są stosowane do schematu ARRAY zamiast opakowanej struktury. Rozważmy na przykład ciąg JSON z tablicą najwyższego poziomu, taką jak:

[{"id": 123, "name": "John"}]

Wywnioskowany schemat TABLICY jest opakowany w STRUKTURĘ:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Aby zmienić typ danych id, określ wskazówkę schematu jako element.id STRING. Aby dodać nową kolumnę typu DOUBLE, określ wartość element.new_col DOUBLE. Ze względu na te wskazówki schemat tablicy JSON najwyższego poziomu staje się:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Rozwijanie schematu przy użyciu polecenia schemaEvolutionMode

from_json wykrywa dodanie nowych kolumn podczas przetwarzania danych. Po from_json wykryciu nowego pola aktualizuje wywnioskowany schemat przy użyciu najnowszego schematu, scalając nowe kolumny na końcu schematu. Typy danych istniejących kolumn pozostają niezmienione. Po aktualizacji schematu potok zostanie automatycznie uruchomiony ponownie przy użyciu zaktualizowanego schematu.

from_json program obsługuje następujące tryby ewolucji schematu, które można ustawić przy użyciu opcjonalnego schemaEvolutionMode ustawienia. Te tryby są zgodne z modułem automatycznego ładowania.

schemaEvolutionMode Zachowanie podczas odczytywania nowej kolumny
addNewColumns (ustawienie domyślne) Stream nie działa. Nowe kolumny są dodawane do schematu. Istniejące kolumny nie ewoluują typów danych.
rescue Schemat nigdy nie ewoluował, a strumień nie kończy się niepowodzeniem z powodu zmian schematu. Wszystkie nowe kolumny są rejestrowane w uratowanej kolumnie danych.
failOnNewColumns Stream nie działa. Usługa Stream nie zostanie uruchomiona ponownie, chyba że dane schemaHints zostaną zaktualizowane lub niepoprawne dane zostaną usunięte.
none Nie rozwija schematu, nowe kolumny są ignorowane, a dane nie są ratowane, chyba że opcja jest ustawiona rescuedDataColumn . Usługa Stream nie kończy się niepowodzeniem z powodu zmian schematu.

Przykład:

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

Uratowana kolumna danych

Uratowana kolumna danych jest automatycznie dodawana do schematu jako _rescued_data. Możesz zmienić nazwę kolumny, ustawiając rescuedDataColumn opcję . Przykład:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

Jeśli zdecydujesz się użyć uratowanej kolumny danych, wszystkie kolumny, które nie pasują do wywnioskowanego schematu, zostaną uratowane zamiast porzucone. Może się to zdarzyć z powodu niezgodności typu danych, brakującej kolumny w schemacie lub różnicy wielkości liter nazwy kolumny.

Obsługa uszkodzonych rekordów

Aby przechowywać rekordy, które są źle sformułowane i nie można ich przeanalizować, dodaj kolumnę _corrupt_record , ustawiając wskazówki schematu, jak w poniższym przykładzie:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Aby zmienić nazwę uszkodzonej kolumny rekordu, ustaw columnNameOfCorruptRecord opcję .

Analizator JSON obsługuje trzy tryby obsługi uszkodzonych rekordów:

Mode Description
PERMISSIVE W przypadku uszkodzonych rekordów wstaw źle sformułowany ciąg do pola skonfigurowanego przez columnNameOfCorruptRecord i ustawia źle sformułowane pola na null. Aby zachować uszkodzone rekordy, można ustawić pole typu ciągu o nazwie columnNameOfCorruptRecord w schemacie zdefiniowanym przez użytkownika. Jeśli schemat nie ma pola, uszkodzone rekordy są porzucane podczas analizowania. Podczas wnioskowania schematu analizator niejawnie dodaje columnNameOfCorruptRecord pole w schemacie wyjściowym.
DROPMALFORMED Ignoruje uszkodzone rekordy.
W przypadku korzystania z trybu DROPMALFORMED i rescuedDataColumn niezgodność typów danych nie powoduje porzucania rekordów. Porzucane są tylko uszkodzone rekordy, takie jak niekompletne lub źle sformułowane dane JSON.
FAILFAST Zgłasza wyjątek, gdy analizator napotyka uszkodzone rekordy.
W przypadku korzystania z trybu FAILFAST wraz z funkcją rescuedDataColumn niezgodność typów danych nie zgłasza błędu. Tylko uszkodzone rekordy zgłaszają błędy, takie jak niekompletne lub źle sformułowane dane JSON.

Odwołaj się do pola w danych wyjściowych from_json

from_json ustala schemat podczas wykonywania potoku danych. Jeśli zapytanie podrzędne odwołuje się do pola przed tym, jak funkcja from_json zostanie wykonana przynajmniej raz, pole nie zostanie rozpoznane, a zapytanie zostanie pominięte. W poniższym przykładzie analiza zapytania tabeli srebrnej zostanie pominięta, dopóki funkcja from_json w zapytaniu brązowym nie zostanie wykonana i nie wywnioskuje schematu.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

from_json Jeśli funkcja i pola, które wywnioskują, są określane w tym samym zapytaniu, analiza może zakończyć się niepowodzeniem, jak w poniższym przykładzie:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Można to naprawić, przenosząc odwołanie do from_json pola do zapytania na dalszym etapie (jak w przykładzie z brązem/srebrem powyżej). Alternatywnie, można określić schemaHints, które zawierają pole from_json do którego się odnosi. Przykład:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Przykłady: Automatyczne wnioskowanie i rozwijanie schematu

Ta sekcja zawiera przykładowy kod umożliwiający automatyczne wnioskowanie schematu i ewolucję przy użyciu from_json potoków deklaratywnych spark w usłudze Lakeflow.

Tworzenie tabeli strumieniowej z magazynu obiektów w chmurze

W poniższym przykładzie użyto read_files składni do utworzenia tabeli przesyłania strumieniowego z magazynu obiektów w chmurze.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python

@dp.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

Tworzenie tabeli przesyłania strumieniowego na platformie Kafka

W poniższym przykładzie użyto składni read_kafka do utworzenia tabeli strumieniowania z systemu Kafka.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

Python

@dp.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

Przykłady: stały schemat

Aby zobaczyć przykładowy kod używający from_json ze stałym schematem, zobacz funkcję from_json.

FAQs

Ta sekcja zawiera odpowiedzi na często zadawane pytania dotyczące wnioskowania schematu i obsługi ewolucji w from_json funkcji.

Jaka jest różnica między from_json i parse_json?

Funkcja parse_json zwraca VARIANT wartość z ciągu JSON.

VARIANT zapewnia elastyczny i wydajny sposób przechowywania częściowo ustrukturyzowanych danych. Pozwala to obejść wnioskowanie schematu i ewolucję, całkowicie pomijając ścisłe typy. Jeśli jednak chcesz wymusić schemat w czasie zapisu (na przykład ze względu na stosunkowo rygorystyczny schemat), from_json może być lepszym rozwiązaniem.

W poniższej tabeli opisano różnice między elementami from_json i parse_json:

Funkcja Przypadki użycia Availability
from_json Ewolucja schematu z from_json utrzymuje schemat. Jest to przydatne, gdy:
  • Chcesz wymusić schemat danych (na przykład przejrzenie każdej zmiany schematu przed ich utrwaleniem).
  • Chcesz zoptymalizować magazyn i wymagać małych opóźnień zapytań i kosztów.
  • Chcesz wygenerować błąd dla danych z niezgodnymi typami.
  • Chcesz wyodrębnić częściowe wyniki z uszkodzonych rekordów JSON i zapisać nieprawidłowo sformułowany rekord w kolumnie _corrupt_record . Natomiast przetwarzanie VARIANT zwraca błąd nieprawidłowego JSON-u.
Dostępne tylko w deklaratywnych potokach Lakeflow Spark z wnioskowaniem i ewolucją schematu
parse_json OPCJA VARIANT jest szczególnie odpowiednia do przechowywania danych, które nie muszą być schematyzowane. Przykład:
  • Chcesz zachować częściowo ustrukturyzowane dane, ponieważ są elastyczne.
  • Schemat zmienia się zbyt szybko, aby przekształcać go w schemat bez częstych awarii strumienia i ponownego uruchamiania systemu.
  • Nie chcesz popełniać błędów z powodu danych o niezgodnych typach. (Pozyskiwanie wariantu zawsze powiedzie się dla prawidłowych rekordów JSON — nawet jeśli występują niezgodności typów).
  • Użytkownicy nie chcą zajmować się uratowaną kolumną danych zawierającą pola, które nie są zgodne ze schematem.
Dostępne z potokami deklaratywnymi platformy Spark i bez usługi Lakeflow

Czy mogę używać składni wnioskowania schematu i ewolucji poza potokami Lakeflow Spark Deklaratywnymi?

Nie, nie można używać składni wnioskowania schematu i ewolucji from_json poza deklaratywnymi potokami Lakeflow Spark.

Jak uzyskać dostęp do schematu wywnioskowany przez from_json?

Wyświetl schemat docelowej tabeli przesyłania strumieniowego.

Czy mogę przekazać from_json schemat, a także przeprowadzić ewolucję?

Nie, nie można przekazać from_json schematu, a także przeprowadzić ewolucję. Można jednak podać wskazówki schematu, aby zastąpić niektóre lub wszystkie pola wywnioskowane przez from_json.

Co się stanie ze schematem, jeśli tabela jest całkowicie odświeżona?

Lokalizacje schematu powiązane z tabelą są resetowane, a schemat jest ponownie ustalany od podstaw.