Поделиться через


Прием данных в виде полуструктурированного типа данных

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

В Databricks Runtime 15.3 и более поздних версиях можно использовать VARIANT тип для приема полуструктурированных данных. В этой статье описано поведение и приведены примеры шаблонов приема данных из облачного хранилища объектов с помощью автозагрузчика и COPY INTO, потоковой передачи записей из Kafka и команд SQL для создания таблиц с вариантными данными или вставки новых записей с помощью типа варианта. В следующей таблице приведены поддерживаемые форматы файлов и поддержка версии Databricks Runtime:

Формат файла Поддерживаемая версия среды выполнения Databricks
JSON (JavaScript Object Notation) 15.3 и выше
XML 16.4 и выше
CSV 16.4 и выше

Смотрите данные о варианте запроса.

Создание таблицы с вариантным столбцом

VARIANT — это стандартный тип SQL в Databricks Runtime 15.3 и более поздних версий и поддерживается таблицами, поддерживаемыми Delta Lake. Управляемые таблицы в Azure Databricks по умолчанию используют Delta Lake, поэтому можно создать пустую таблицу с одним столбцом VARIANT с помощью следующего синтаксиса:

CREATE TABLE table_name (variant_column VARIANT)

Кроме того, можно использовать PARSE_JSON функцию в строке JSON или FROM_XML функции в XML-строке, чтобы использовать инструкцию CTAS для создания таблицы с вариантным столбцом. В следующем примере создается таблица с двумя столбцами:

  • Столбец id, извлеченный из строки JSON в виде типа STRING.
  • Столбец variant_column содержит всю строку JSON, закодированную как тип VARIANT.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Примечание.

Databricks рекомендует извлекать и хранить поля в качестве не вариантных столбцов, которые планируется использовать для ускорения запросов и оптимизации макета хранилища.

VARIANT столбцы нельзя использовать для кластеризации ключей, секций или ключей порядка Z. Тип данных VARIANT нельзя использовать для сравнения, группировки, упорядочивания и задания операций. Для получения полного списка ограничений см. Ограничения.

Вставка данных с помощью parse_json

Если целевая таблица уже содержит столбец, закодированный как VARIANT, можно использовать parse_json для вставки строковых записей JSON как VARIANT, как показано в следующем примере:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Питон

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Вставка данных с помощью from_xml

Если целевая таблица уже содержит столбец, закодированный как VARIANT, можно использовать from_xml для вставки xml-строковых записей в качестве VARIANT. Рассмотрим пример.

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Питон

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Вставка данных с помощью from_csv

Если целевая таблица уже содержит столбец, закодированный как VARIANT, можно использовать from_csv для вставки xml-строковых записей в качестве VARIANT. Рассмотрим пример.

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Питон

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Прием данных из облачного хранилища объектов в качестве варианта

Автозагрузчик можно использовать для загрузки всех данных из поддерживаемых источников файлов в виде одного VARIANT столбца в целевой таблице. Так как VARIANT гибко адаптируется к изменениям схемы и типа и соблюдает чувствительность к регистру, а также учитывает NULL значения, присутствующие в источнике данных, этот шаблон является надежным для большинства сценариев загрузки данных с учетом следующих предостережений:

  • Неправильно сформированные записи не могут быть закодированы с помощью VARIANT типа.
  • VARIANT Тип может хранить только записи размером до 16 мб.

Примечание.

Variant обрабатывает слишком большие записи, аналогичные поврежденным записям. В режиме обработки по умолчанию PERMISSIVE слишком большие записи захватываются в corruptRecordColumn.

Так как вся запись сохраняется как один VARIANT столбец, во время приема данных не происходит эволюция схемы и функционал rescuedDataColumn не поддерживается. В следующем примере предполагается, что целевая таблица уже существует с одним столбцом VARIANT.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Можно также указать VARIANT при определении схемы или передаче schemaHints. Данные в поле источника, на который ссылается ссылка, должны содержать допустимую запись. В следующих примерах показан этот синтаксис:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Используйте COPY INTO с вариантом

Databricks рекомендует использовать Auto Loader вместо COPY INTO, если доступно.

COPY INTO поддерживает импорт всего содержимого поддерживаемого источника данных в один столбец. В следующем примере создается новая таблица с одним столбцом VARIANT, а затем используется COPY INTO для приема записей из источника JSON-файла.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Потоковая передача данных Kafka в качестве варианта

Многие потоки Kafka кодируют свои данные в формате JSON. Прием потоков Kafka с помощью VARIANT делает эти рабочие нагрузки надежными для изменений схемы.

В следующем примере показано чтение источника потоковой передачи Kafka, приведение key в качестве STRING и value как VARIANTи запись в целевую таблицу.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)