重要
此功能目前以公共预览版提供。
在 Databricks Runtime 15.3 及更高版本中,可以使用VARIANT类型引入半结构化数据。 本文介绍了行为并提供使用自动加载器和COPY INTO从云对象存储引入数据的示例模式、Kafka 的流式处理记录以及用于使用变体数据新建表或使用变体类型插入新记录的 SQL 命令。 下表汇总了支持的文件格式和 Databricks Runtime 版本支持:
| 文件格式 | 支持的 Databricks Runtime 版本 |
|---|---|
| JSON | 15.3 及更高版本 |
| XML | 16.4 及更高版本 |
| CSV | 16.4 及更高版本 |
请参阅查询变体数据。
创建具有变体列的表
VARIANT 是 Databricks Runtime 15.3 及更高版本中的标准 SQL 类型,并且得到基于 Delta Lake 的表的支持。 Azure Databricks 上的托管表默认使用 Delta Lake,因此可以使用以下语法创建包含单个 VARIANT 列的空表。
CREATE TABLE table_name (variant_column VARIANT)
或者,可以使用 CTAS 语句创建具有变体列的表。 使用PARSE_JSON函数解析JSON字符串或FROM_XML函数解析XML字符串。 以下示例创建一个包含两列的表。
- 列
id作为类型STRING从 JSON 字符串中提取。 -
variant_column包含编码为VARIANT类型的整个 JSON 字符串。
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
注意
Databricks 建议提取经常查询的字段并将其存储为非变体列,以加速查询并优化存储布局。
VARIANT列不能用于聚类分析键、分区或 Z 顺序键。
VARIANT 数据类型不能用于比较、分组、排序和设置操作。 有关详细信息,请参阅 “限制”。
使用parse_json插入数据
如果目标表已包含编码为VARIANT的列,则可以使用parse_json将 JSON 字符串记录插入为VARIANT。 例如,从 json_string 列中解析 JSON 字符串,并将其插入 table_name 列中。
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
使用from_xml插入数据
如果目标表已包含编码为 VARIANT的列,则可以使用 from_xml 将 XML 字符串记录插入为 VARIANT。 例如,解析 xml_string 列中的 XML 字符串并将其插入 table_name 中。
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
使用 from_csv 插入数据
如果目标表已包含编码为VARIANT的列,则可以使用 from_csv 将 CSV 字符串记录作为VARIANT插入。 例如,从 csv_string 列解析 CSV 记录并将其插入 table_name 中。
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
将数据从云对象存储引入为变体
自动加载程序可用于将受支持的文件源中的所有数据作为目标表中的单个 VARIANT 列加载。 由于VARIANT能够灵活应对架构和类型更改,并且维护数据源中存在的区分大小写和NULL值,因此此模式对于大多数引入方案都十分可靠,但需要注意以下事项:
- 格式不正确的记录不能使用
VARIANT类型进行编码。 -
VARIANT类型只能容纳最大 16 MB 的记录大小。
注意
Variant 对超大型记录的处理方式与对损坏记录的处理类似。 在默认 PERMISSIVE 处理模式下,过大的记录会被捕获到 corruptRecordColumn。
由于整个记录记录为单个 VARIANT 列,因此在引入期间不会发生架构演变,rescuedDataColumn 不被支持。 以下示例假定目标表已存在单个VARIANT列。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
还可以在定义架构或传递VARIANT时指定schemaHints。 引用的源字段中的数据必须包含有效的记录。 以下示例演示了此语法。
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
与变体一起使用COPY INTO
Databricks 建议在可用时使用自动加载程序,而不是 COPY INTO。
COPY INTO 支持将支持的数据源的全部内容作为单个列引入。 以下示例新建包含单个VARIANT列的表,然后使用COPY INTO从 JSON 文件源引入记录。
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FILES = ('file-name')
FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')
将 Kafka 数据作为变体流式传输
许多 Kafka 流使用 JSON 对有效负载进行编码。 使用VARIANT引入 Kafka 流会使这些工作负载对架构更改非常可靠。
以下示例演示了如何读取 Kafka 流式处理源,将key强制转换为STRING,将value强制转换为VARIANT,并写出到目标表。
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
后续步骤
- 查询变体数据。
- 配置 Variant 类型支持。
- 详细了解 Auto Loader。 请参阅什么是自动加载程序?。