读取和写入协议缓冲区
Azure Databricks 原生支持 Apache Spark 结构和协议缓冲区 (protobuf) 之间的序列化和反序列化。 Protobuf 支持作为 Apache Spark 数据帧转换器实现,可以与结构化流式处理配合使用或用于批处理操作。
如何反序列化和序列化协议缓冲区
在 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 from_protobuf
和 to_protobuf
函数来序列化和反序列化数据。 Protobuf 序列化通常用于流式处理工作负载。
protobuf 函数的基本语法与读取和写入函数类似。 在使用之前,必须导入这些函数。
from_protobuf
将二进制列强制转换为结构,并将 to_protobuf
结构列强制转换为二进制。 必须提供使用 options
参数指定的架构注册表或由 descFilePath
参数标识的描述符文件。
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
以下示例演示如何使用 from_protobuf()
处理二进制 protobuf 记录,以及使用 to_protobuf()
将 Spark SQL 结构转换为二进制 protobuf。
将 protobuf 与 Confluent 架构注册表配合使用
Azure Databricks 支持使用 Confluent 架构注册表来定义 Protobuf。
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
向外部 Confluent 架构注册表进行身份验证
若要向外部 Confluent 架构注册表进行身份验证,请更新架构注册表选项以包含身份验证凭据和 API 密钥。
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Unity 目录卷中使用信任存储和密钥存储文件
在 Databricks Runtime 14.3 LTS 及更高版本中,可以使用 Unity Catalog 卷中的信任存储和密钥存储文件向 Confluent 架构注册表进行身份验证。 根据以下示例更新架构注册表选项:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
将 Protobuf 与描述符文件配合使用
还可以引用可用于计算群集的 protobuf 描述符文件。 请确保你具有读取文件的适当权限,具体取决于其位置。
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Protobuf 函数中支持的选项
Protobuf 函数中支持以下选项。
- mode:确定反序列化 Protobuf 记录时的错误处理方式。 这些错误可能由各种类型的格式错误的记录引起,包括记录的实际架构与
from_protobuf()
中提供的预期架构不匹配。- 值:
FAILFAST
(默认值):遇到格式错误的记录且任务失败时,会引发错误。PERMISSIVE
:为格式错误的记录返回 NULL。 请谨慎使用此选项,因为它可能导致丢弃许多记录。 当源中的一小部分记录不正确时,这非常有用。
- 值:
- recursive.fields.max.depth:添加对递归字段的支持。 Spark SQL 架构不支持递归字段。 未指定此选项时,不允许递归字段。 为了在 Protobuf 中支持递归字段,需要将该字段扩展到指定的深度。
值:
-1 (默认值):不允许递归字段。
0:丢弃递归字段。
1:允许单个级别的递归。
[2-10]:指定多级递归的阈值,最多为 10 级。
将值设置为大于 0 可以通过将嵌套字段扩展到配置的深度来允许递归字段。 不允许设置大于 10 的值,以避免无意中创建非常大的架构。 如果 Protobuf 消息的深度超出了配置的限制,则返回的 Spark 结构将在超出递归限制后被截断。
示例:假设有一个具有以下递归字段的 Protobuf:
message Person { string name = 1; Person friend = 2; }
下面列出了此设置具有不同值的结束架构:
- 选项设置为 1:
STRUCT<name: STRING>
- 选项设置为 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- 选项设置为 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- 选项设置为 1:
- convert.any.fields.to.json:此选项可用于将 Protobuf Any 字段转换为 JSON。 应谨慎启用此功能。 JSON 转换和处理效率低下。 此外,JSON 字符串字段会丢失 Protobuf 架构安全性,使下游处理容易出错。
值:
- False(默认值):在运行时,此类通配符字段可以以二进制数据的形式包含任意 Protobuf 消息。 默认情况下,此类字段的处理方式类似于普通 Protobuf 消息。 它有两个架构为
(STRUCT<type_url: STRING, value: BINARY>)
的字段。 默认情况下,不会以任何方式解释二进制value
字段。 但在实际操作中,二进制数据在一些应用程序中可能不方便使用。 - True:通过将此值设置为 True,可在运行时将
Any
字段转换为 JSON 字符串。 使用此选项,将分析二进制文件,并将 Protobuf 消息反序列化为 JSON 字符串。
- False(默认值):在运行时,此类通配符字段可以以二进制数据的形式包含任意 Protobuf 消息。 默认情况下,此类字段的处理方式类似于普通 Protobuf 消息。 它有两个架构为
示例:假设有两个定义如下的 Protobuf 类型:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
启用此选项后,
from_protobuf("col", messageName ="ProtoWithAny")
的架构为:STRUCT<event_name: STRING, details: STRING>
。在运行时,如果
details
字段包含Person
Protobuf 消息,则返回的值如下所示:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
。要求:
Any
字段中使用的所有可能的 Protobuf 类型的定义应在传递给from_protobuf()
的 Protobuf 描述符文件中提供。- 如果找不到
Any
Protobuf,则会导致该记录出错。 - Schema-registry 目前不支持此功能。
- emit.default.values:在将 Protobuf 反序列化为 Spark 结构时,启用零值呈现字段。 应谨慎使用此选项。 通常不建议依赖语义中的这种细微差异。
值
- False(默认值):当序列化 Protobuf 中的字段为空时,Spark 结构中生成的字段默认为 null。 不启用此选项并将
null
视为默认值会更简单。 - True:启用此选项后,会用相应的默认值填充这些字段。
- False(默认值):当序列化 Protobuf 中的字段为空时,Spark 结构中生成的字段默认为 null。 不启用此选项并将
示例:假设以下 Protobuf 的 Protobuf 构造类似于
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- 如果此选项设置为 False,则调用
from_protobuf()
后的 Spark 结构将全部为 null:{"name": null, "age": null, "middle_name": "", "salary": null}
。 即使设置了两个字段(age
和middle_name
)的值,Protobuf 也不以有线格式包含它们,因为它们是默认值。 - 如果此选项设置为 True,则调用
from_protobuf()
后的 Spark 结构将为:{"name": "", "age": 0, "middle_name": "", "salary": null}
。salary
字段保留为 null,因为它被显式声明为optional
,并且未在输入记录中设置。
- 如果此选项设置为 False,则调用
- enums.as.ints:启用后,Protobuf 中的枚举字段在 Spark 中呈现为整数字段。
值
- False(默认值)
- True:启用后,Protobuf 中的枚举字段在 Spark 中呈现为整数字段。
示例:考虑以下 Protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
给定一条类似
Person(job = ENGINEER)
的 Protobuf 消息:- 禁用此选项后,相应的 Spark 结构将为
{"job": "ENGINEER"}
。 - 启用此选项后,相应的 Spark 结构将为
{"job": 1}
。
请注意,这些字段的架构在每种情况下都是不同的(整数而不是默认字符串)。 此类更改可能会影响下游表的架构。
- 禁用此选项后,相应的 Spark 结构将为
架构注册表选项
将架构注册表与 Protobuf 函数一起使用时,以下架构注册表选项是相关的。
- schema.registry.subject
- 必须
- 指定架构注册表中架构的主题,例如“client-event”
- schema.registry.address
- 必须
- 架构注册表的 URL,例如
https://schema-registry.example.com:8081
- schema.registry.protobuf.name
- 可选
- 默认:
<NONE>
。 - 主题的 schema-registry 条目可以包含多个 Protobuf 定义,就像单个
proto
文件一样。 如果未指定此选项,则第一个 Protobuf 用于架构。 当 Protobuf 消息不是条目中的第一个消息时,请指定该消息的名称。 例如,假设一个条目有两个 Protobuf 定义:依次是“Person”和“Location”。 如果流对应于“Location”而不是“Person”,请将此选项设置为“Location”(或其全名,包括包“com.example.protos.Location”)。
- schema.registry.schema.evolution.mode
- 默认值:“restart”。
- 支持的模式:
- “restart”
- “none”
- 此选项为
from_protobuf()
设置架构演变模式。 在查询开始时,Spark 会记录给定主题的最新 schema-id。 这可确定from_protobuf()
的架构。 查询启动后,可能会将新架构发布到架构注册表。 当在传入记录中发现较新的 schema-id 时,它表明架构发生了更改。 此选项确定如何处理此类架构更改:- restart(默认值):在发现较新的 schema-id 时触发
UnknownFieldException
。 这会终止查询。 Databricks 建议将作业配置为在查询失败时重启,以获取架构更改。 - none:忽略 schema-id 更改。 具有较新 schema-id 的记录使用在查询开始时观察到的同一架构进行分析。 较新的 Protobuf 定义应向后兼容,并忽略新字段。
- restart(默认值):在发现较新的 schema-id 时触发
- confluent.schema.registry.
<schema-registy-client-option>
- 可选
- Schema-registry 使用 Confluent 架构注册表客户端连接到 Confluent schema-registry。 可以使用前缀“confluent.schema.registry”指定客户端支持的任何配置选项。 例如,以下两个设置提供“USER_INFO”身份验证凭据:
- “confluent.schema.registry.basic.auth.credentials.source”:‘USER_INFO’
- “confluent.schema.registry.basic.auth.user.info”:“
<KEY>
:<SECRET>
”