Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
В Databricks Runtime 15.3 и более поздних версиях можно использовать VARIANT тип для приема полуструктурированных данных. В этой статье описано поведение и приведены примеры шаблонов приема данных из облачного хранилища объектов с помощью автозагрузчика и COPY INTO, потоковой передачи записей из Kafka и команд SQL для создания таблиц с вариантными данными или вставки новых записей с помощью типа варианта. В следующей таблице приведены поддерживаемые форматы файлов и поддержка версии Databricks Runtime:
| Формат файла | Поддерживаемая версия среды выполнения Databricks |
|---|---|
| JSON (JavaScript Object Notation) | 15.3 и выше |
| XML | 16.4 и выше |
| CSV | 16.4 и выше |
Смотрите данные о варианте запроса.
Создание таблицы с вариантным столбцом
VARIANT — это стандартный тип SQL в Databricks Runtime 15.3 и более поздних версий и поддерживается таблицами, поддерживаемыми Delta Lake. Управляемые таблицы в Azure Databricks по умолчанию используют Delta Lake, поэтому можно создать пустую таблицу с одним столбцом VARIANT с помощью следующего синтаксиса:
CREATE TABLE table_name (variant_column VARIANT)
Кроме того, можно использовать PARSE_JSON функцию в строке JSON или FROM_XML функции в XML-строке, чтобы использовать инструкцию CTAS для создания таблицы с вариантным столбцом. В следующем примере создается таблица с двумя столбцами:
- Столбец
id, извлеченный из строки JSON в виде типаSTRING. - Столбец
variant_columnсодержит всю строку JSON, закодированную как типVARIANT.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Примечание.
Databricks рекомендует извлекать и хранить поля в качестве не вариантных столбцов, которые планируется использовать для ускорения запросов и оптимизации макета хранилища.
VARIANT столбцы нельзя использовать для кластеризации ключей, секций или ключей порядка Z. Тип данных VARIANT нельзя использовать для сравнения, группировки, упорядочивания и задания операций. Для получения полного списка ограничений см. Ограничения.
Вставка данных с помощью parse_json
Если целевая таблица уже содержит столбец, закодированный как VARIANT, можно использовать parse_json для вставки строковых записей JSON как VARIANT, как показано в следующем примере:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Питон
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Вставка данных с помощью from_xml
Если целевая таблица уже содержит столбец, закодированный как VARIANT, можно использовать from_xml для вставки xml-строковых записей в качестве VARIANT. Рассмотрим пример.
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Питон
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Вставка данных с помощью from_csv
Если целевая таблица уже содержит столбец, закодированный как VARIANT, можно использовать from_csv для вставки xml-строковых записей в качестве VARIANT. Рассмотрим пример.
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Питон
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Прием данных из облачного хранилища объектов в качестве варианта
Автозагрузчик можно использовать для загрузки всех данных из поддерживаемых источников файлов в виде одного VARIANT столбца в целевой таблице. Так как VARIANT гибко адаптируется к изменениям схемы и типа и соблюдает чувствительность к регистру, а также учитывает NULL значения, присутствующие в источнике данных, этот шаблон является надежным для большинства сценариев загрузки данных с учетом следующих предостережений:
- Неправильно сформированные записи не могут быть закодированы с помощью
VARIANTтипа. -
VARIANTТип может хранить только записи размером до 16 мб.
Примечание.
Variant обрабатывает слишком большие записи, аналогичные поврежденным записям. В режиме обработки по умолчанию PERMISSIVE слишком большие записи захватываются в corruptRecordColumn.
Так как вся запись сохраняется как один VARIANT столбец, во время приема данных не происходит эволюция схемы и функционал rescuedDataColumn не поддерживается. В следующем примере предполагается, что целевая таблица уже существует с одним столбцом VARIANT.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Можно также указать VARIANT при определении схемы или передаче schemaHints. Данные в поле источника, на который ссылается ссылка, должны содержать допустимую запись. В следующих примерах показан этот синтаксис:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Используйте COPY INTO с вариантом
Databricks рекомендует использовать Auto Loader вместо COPY INTO, если доступно.
COPY INTO поддерживает импорт всего содержимого поддерживаемого источника данных в один столбец. В следующем примере создается новая таблица с одним столбцом VARIANT, а затем используется COPY INTO для приема записей из источника JSON-файла.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Потоковая передача данных Kafka в качестве варианта
Многие потоки Kafka кодируют свои данные в формате JSON. Прием потоков Kafka с помощью VARIANT делает эти рабочие нагрузки надежными для изменений схемы.
В следующем примере показано чтение источника потоковой передачи Kafka, приведение key в качестве STRING и value как VARIANTи запись в целевую таблицу.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)