讀取和寫入通訊協議緩衝區
Azure Databricks 提供原生支援 Apache Spark 結構與通訊協定緩衝區之間的串行化和還原串行化(protobuf)。 Protobuf 支援會實作為 Apache Spark DataFrame 轉換器,並可搭配結構化串流或批次作業使用。
如何還原串行化和串行化通訊協議緩衝區
在 Databricks Runtime 12.2 LTS 和更新版本中,您可以使用 from_protobuf
和 to_protobuf
函式來串行化和還原串行化數據。 Protobuf 串行化通常用於串流工作負載。
protobuf 函式的基本語法類似於讀取和寫入函式。 您必須先匯入這些函式,才能使用。
from_protobuf
將二進位數據行轉換成結構,並將 to_protobuf
結構數據行轉換成二進位。 您必須提供以 options
自變數或 自變數所 descFilePath
識別描述項檔案所指定的架構登錄。
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
下列範例說明使用 處理二進位 Protobuf 記錄, from_protobuf()
並使用 將 Spark SQL 結構轉換成二進位 Protobuf to_protobuf()
。
搭配 Confluent 架構登錄使用 protobuf
Azure Databricks 支援使用 Confluent 架構登錄 來定義 Protobuf。
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
向外部 Confluent 架構登錄進行驗證
若要向外部 Confluent 架構登錄進行驗證,請更新您的架構登錄選項,以包含驗證認證和 API 金鑰。
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
在 Unity 目錄磁碟區中使用信任存放區和金鑰存放區檔案
在 Databricks Runtime 14.3 LTS 和更新版本中,您可以使用 Unity 目錄磁碟區中的信任存放區和密鑰存放區檔案向 Confluent 架構登錄進行驗證。 根據下列範例更新架構登錄選項:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
搭配描述元檔案使用 Protobuf
您也可以參考計算叢集可用的 protobuf 描述元檔案。 請確定您有適當的許可權可讀取檔案,視檔案的位置而定。
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Protobuf 函式中支持的選項
Protobuf 函式支援下列選項。
- mode:決定還原串行化 Protobuf 記錄時的錯誤處理方式。 錯誤可能是由各種類型的格式不正確的記錄所造成,包括記錄的實際架構與 中
from_protobuf()
提供的預期架構不符。- 值:
FAILFAST
(預設值):遇到格式不正確的記錄且工作失敗時,就會擲回錯誤。PERMISSIVE
:格式不正確的記錄會傳回 NULL。 請小心使用此選項,因為它可能會導致卸除許多記錄。 當來源中的一小部分記錄不正確時,這會很有用。
- 值:
- recursive.fields.max.depth:新增遞歸字段的支援。 Spark SQL 架構不支援遞歸欄位。 如果未指定此選項,則不允許遞歸欄位。 為了支援 Protobufs 中的遞歸字段,它們必須擴充至指定的深度。
值:
-1 (預設值):不允許遞歸欄位。
0:遞歸欄位已卸除。
1:允許單一層級的遞歸。
[2-10]:指定多個遞歸的臨界值,最多 10 個層級。
將值設定為大於0,可將巢狀欄位展開至設定的深度,以允許遞歸欄位。 不允許大於 10 的值,以避免不小心建立非常大的架構。 如果 Protobuf 訊息的深度超過設定的限制,則會在遞歸限制之後截斷傳回的 Spark 結構。
範例:請考慮具有下列遞歸字段的 Protobuf:
message Person { string name = 1; Person friend = 2; }
下列列出此設定具有不同值的結束架構:
- 選項設定為 1:
STRUCT<name: STRING>
- 選項設定為 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- 選項設定為 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- 選項設定為 1:
- convert.any.fields.to.json:此選項會啟用將 Protobuf Any 字段轉換成 JSON。 應謹慎啟用此功能。 JSON 轉換和處理效率不佳。 此外,JSON 字串字段會遺失 Protobuf 架構安全性,讓下游處理容易發生錯誤。
值:
- False (預設值):在運行時間,這類通配符欄位可以包含任意 Protobuf 訊息做為二進位數據。 根據預設,這類欄位會像一般 Protobuf 訊息一樣處理。 它有兩個具有架構
(STRUCT<type_url: STRING, value: BINARY>)
的欄位。 根據預設,二進位value
欄位元元不會以任何方式解譯。 但是,在實務上,二進位數據可能無法在某些應用程式中運作。 - True:將此值設定為 True 可讓欄位在運行時間將欄位轉換成
Any
JSON 字串。 使用此選項時,會剖析二進位檔,並將 Protobuf 訊息還原串行化為 JSON 字串。
- False (預設值):在運行時間,這類通配符欄位可以包含任意 Protobuf 訊息做為二進位數據。 根據預設,這類欄位會像一般 Protobuf 訊息一樣處理。 它有兩個具有架構
範例:請考慮兩個定義如下的 Protobuf 類型:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
啟用此選項後,的
from_protobuf("col", messageName ="ProtoWithAny")
架構會是:STRUCT<event_name: STRING, details: STRING>
。在運行時間,如果
details
字段包含Person
Protobuf 訊息,傳回的值看起來會像這樣:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
。需求:
- 傳遞至
from_protobuf()
之 Protobuf 描述元檔案中,應該提供欄位內Any
所有可能 Protobuf 型別的定義。 - 如果
Any
找不到 Protobuf,則會產生該記錄的錯誤。 - 架構登錄目前不支援此功能。
- 傳遞至
- emit.default.values:將 Protobuf 還原串行化為Spark結構時,啟用具有零值的轉譯欄位。 此選項應該謹慎使用。 通常不建議依賴語意中的這類更精細的差異。
數值
- False (預設值):當串行化 Protobuf 中的欄位是空的時,Spark 結構中產生的欄位預設為 null。 不啟用此選項並視為
null
預設值會比較簡單。 - True:啟用此選項時,這類字段會填入對應的預設值。
- False (預設值):當串行化 Protobuf 中的欄位是空的時,Spark 結構中產生的欄位預設為 null。 不啟用此選項並視為
範例:請考慮下列 Protobuf,並建構 Protobuf,如下所示
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- 如果此選項設定為 False,呼叫
from_protobuf()
后的 Spark 結構會是所有 Null:{"name": null, "age": null, "middle_name": "", "salary": null}
。 雖然已設定兩個字段 (age
和middle_name
) 的值,但 Protobuf 不會將它們包含在電線格式中,因為它們是預設值。 - 此選項設定為 True 時,呼叫
from_protobuf()
後的 Spark 結構會是:{"name": "", "age": 0, "middle_name": "", "salary": null}
。 欄位salary
會維持 null,因為它已明確宣告optional
,而且不會在輸入記錄中設定。
- 如果此選項設定為 False,呼叫
- enums.as.ints:啟用時,Protobuf 中的列舉欄位會轉譯為Spark中的整數位段。
數值
- False (預設值)
- True:啟用時,Protobuf 中的列舉欄位會轉譯為 Spark 中的整數位段。
範例:請考慮下列 Protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
指定 Protobuf 訊息, 例如
Person(job = ENGINEER)
:- 停用這個選項時,對應的 Spark 結構會是
{"job": "ENGINEER"}
。 - 開啟這個選項後,對應的 Spark 結構會是
{"job": 1}
。
請注意,這些欄位的架構在每個案例中都不同(整數而非預設字串)。 這類變更可能會影響下游數據表的架構。
- 停用這個選項時,對應的 Spark 結構會是
架構登錄選項
搭配 Protobuf 函式使用架構登錄時,下列架構登錄選項是相關的。
- schema.registry.subject
- 必要
- 指定架構登錄中架構的主體,例如 “client-event”
- schema.registry.address
- 必要
- 架構登錄的 URL,例如
https://schema-registry.example.com:8081
- schema.registry.protobuf.name
- 選擇性
- 預設值:
<NONE>
。 - 主體的架構登錄專案可以包含多個 Protobuf 定義,就像單
proto
一檔案一樣。 未指定此選項時,第一個 Protobuf 會用於架構。 當 Protobuf 訊息不是專案中的第一個訊息時,請指定它的名稱。 例如,假設有兩個 Protobuf 定義的專案:“Person” 和 “Location” 的順序。 如果數據流對應至 “Location” 而不是 “Person”,請將此選項設定為 “Location” (或其完整名稱,包括套件 “com.example.protos.Location” )。
- schema.registry.schema.evolution.mode
- 默認值:「restart」。。
- 支援的模式:
- “restart”
- “none”
- 此選項會設定的
from_protobuf()
架構演進模式。 在查詢開始時,Spark 會記錄指定主體的最新架構標識碼。 這會決定的from_protobuf()
架構。 查詢啟動之後,新的架構可能會發佈至架構登錄。 當傳入記錄中注意到較新的schema-id時,表示架構的變更。 這個選項會決定如何處理架構的這類變更:- restart (預設值):當注意到較新的架構識別碼時,觸發
UnknownFieldException
。 這會終止查詢。 Databricks 建議設定作業以在查詢失敗時重新啟動,以挑選架構變更。 - none:忽略架構標識碼變更。 具有較新架構標識碼的記錄會剖析為查詢開頭觀察到的相同架構。 較新的 Protobuf 定義必須是回溯相容,而且會忽略新的欄位。
- restart (預設值):當注意到較新的架構識別碼時,觸發
- confluent.schema.registry。
<schema-registy-client-option>
- 選擇性
- Schema-registry 會使用 Confluent Schema Registry 用戶端連線到 Confluent schema-registry。 用戶端支援的任何組態選項都可以使用前置詞 「confluent.schema.registry」 來指定。 例如,下列兩個設定提供「USER_INFO」驗證認證:
- “confluent.schema.registry.basic.auth.credentials.source”: 'USER_INFO'
- “confluent.schema.registry.basic.auth.user.info”: “
<KEY>
:<SECRET>
”