Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
W środowisku Databricks Runtime 15.3 lub nowszym można użyć typu VARIANT do pobierania częściowo ustrukturyzowanych danych. W tym artykule opisano zachowanie i przedstawiono przykładowe wzorce pozyskiwania danych z magazynu obiektów w chmurze przy użyciu automatycznego modułu ładującego i COPY INTO, rekordów przesyłanych strumieniowo z platformy Kafka i poleceń SQL do tworzenia nowych tabel z danymi wariantu lub wstawiania nowych rekordów przy użyciu typu wariantu. Poniższa tabela zawiera podsumowanie obsługiwanych formatów plików i obsługi wersji środowiska Databricks Runtime:
| Format pliku | Obsługiwana wersja środowiska Databricks Runtime |
|---|---|
| JSON | 15.3 i nowsze |
| XML | 16.4 i nowsze |
| CSV | 16.4 i nowsze |
Zobacz Dane wariantu zapytania.
Tworzenie tabeli z kolumną wariantu
VARIANT jest standardowym typem SQL w środowisku Databricks Runtime 15.3 lub nowszym i obsługiwanym przez tabele wspierane przez usługę Delta Lake. Tabele zarządzane w usłudze Azure Databricks domyślnie używają usługi Delta Lake, więc można utworzyć pustą tabelę z jedną kolumną VARIANT przy użyciu następującej składni:
CREATE TABLE table_name (variant_column VARIANT)
Alternatywnie możesz użyć PARSE_JSON funkcji na łańcuchu JSON lub FROM_XML funkcji na łańcuchu XML, aby użyć instrukcji CTAS do utworzenia tabeli z kolumną typu variant. Poniższy przykład tworzy tabelę z dwiema kolumnami:
- Kolumna
idwyodrębniona z ciągu JSON jako typSTRING. - Kolumna
variant_columnzawiera cały ciąg JSON zakodowany jako typVARIANT.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Uwaga
Databricks zaleca wyodrębnianie i przechowywanie pól jako kolumn, które nie są wariantami, których planujesz używać do przyspieszania zapytań i optymalizowania układu przechowywania.
VARIANT kolumny nie mogą być używane w przypadku kluczy klastrowania, partycji lub kluczy kolejności Z. Nie można używać VARIANT typu danych na potrzeby porównań, grupowania, porządkowania i ustawiania operacji. Aby uzyskać pełną listę ograniczeń, zobacz Ograniczenia.
Wstawianie danych przy użyciu parse_json
Jeśli tabela docelowa zawiera już kolumnę zakodowaną jako VARIANT, możesz użyć parse_json, aby wstawić rekordy ciągów JSON jako VARIANT, jak w poniższym przykładzie:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Wstawianie danych przy użyciu from_xml
Jeśli tabela docelowa zawiera już kolumnę zakodowaną jako VARIANT, możesz użyć from_xml, aby wstawić jako VARIANT rekordy ciągów XML. Przykład:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Wstawianie danych przy użyciu from_csv
Jeśli tabela docelowa zawiera już kolumnę zakodowaną jako VARIANT, możesz użyć from_csv, aby wstawić jako VARIANT rekordy ciągów XML. Przykład:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Importowanie danych z przechowywania obiektów w chmurze w formie wariantu
Auto Loader może służyć do ładowania wszystkich danych z obsługiwanych źródeł plików jako pojedynczej kolumny VARIANT w tabeli docelowej. Ponieważ VARIANT jest elastyczny względem zmian schematu i typu oraz uwzględnia wielkość liter i wartości NULL obecne w źródle danych, ten wzorzec sprawdza się w większości scenariuszy przyjmowania danych z następującymi zastrzeżeniami:
- Źle sformułowane rekordy nie mogą być zakodowane przy użyciu
VARIANTtypu. -
VARIANTtyp może przechowywać tylko rekordy o rozmiarze do 16 mb.
Uwaga
Wariant traktuje zbyt duże rekordy tak samo jak uszkodzone rekordy. W domyślnym trybie przetwarzania PERMISSIVE w obiekcie corruptRecordColumn przechwytywane są zbyt duże rekordy.
Ponieważ cały zapis jest rejestrowany jako pojedyncza VARIANT kolumna, podczas wczytywania danych nie występują żadne zmiany schematu i rescuedDataColumn nie jest obsługiwana. W poniższym przykładzie przyjęto założenie, że tabela docelowa już istnieje z jedną kolumną VARIANT.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Można również określić VARIANT podczas definiowania schematu lub przekazywania schemaHints. Dane w polu źródłowym, do których odwołuje się odwołanie, muszą zawierać prawidłowy rekord. W poniższych przykładach pokazano tę składnię:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Użyj COPY INTO z wariantem
Usługa Databricks zaleca używanie automatycznego modułu ładującego COPY INTO w przypadku dostępności.
COPY INTO obsługuje importowanie całej zawartości z obsługiwanego źródła danych jako jednej kolumny. Poniższy przykład tworzy nową tabelę z pojedynczą kolumną VARIANT, a następnie używa COPY INTO do pozyskiwania rekordów ze źródła plików JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Przesyłanie strumieniowe danych z Kafki w postaci wariantu
Wiele strumieni platformy Kafka koduje swoje ładunki przy użyciu kodu JSON. Podczas pozyskiwania strumieni Kafka z użyciem VARIANT, obciążenia te stają się odporne na zmiany w schemacie.
W poniższym przykładzie pokazano odczytywanie źródła przesyłania strumieniowego Kafka, rzutowanie key jako STRING i value jako VARIANT, oraz zapisywanie do tabeli docelowej.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)