分享方式:


讀取和寫入通訊協議緩衝區

Azure Databricks 提供原生支援 Apache Spark 結構與通訊協定緩衝區之間的串行化和還原串行化(protobuf)。 Protobuf 支援會實作為 Apache Spark DataFrame 轉換器,並可搭配結構化串流或批次作業使用。

如何還原串行化和串行化通訊協議緩衝區

在 Databricks Runtime 12.2 LTS 和更新版本中,您可以使用 from_protobufto_protobuf 函式來串行化和還原串行化數據。 Protobuf 串行化通常用於串流工作負載。

protobuf 函式的基本語法類似於讀取和寫入函式。 您必須先匯入這些函式,才能使用。

from_protobuf 將二進位數據行轉換成結構,並將 to_protobuf 結構數據行轉換成二進位。 您必須提供以 options 自變數或 自變數所 descFilePath 識別描述項檔案所指定的架構登錄。

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

下列範例說明使用 處理二進位 Protobuf 記錄, from_protobuf() 並使用 將 Spark SQL 結構轉換成二進位 Protobuf to_protobuf()

搭配 Confluent 架構登錄使用 protobuf

Azure Databricks 支援使用 Confluent 架構登錄 來定義 Protobuf。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

向外部 Confluent 架構登錄進行驗證

若要向外部 Confluent 架構登錄進行驗證,請更新您的架構登錄選項,以包含驗證認證和 API 金鑰。

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

在 Unity 目錄磁碟區中使用信任存放區和金鑰存放區檔案

在 Databricks Runtime 14.3 LTS 和更新版本中,您可以使用 Unity 目錄磁碟區中的信任存放區和密鑰存放區檔案向 Confluent 架構登錄進行驗證。 根據下列範例更新架構登錄選項:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

搭配描述元檔案使用 Protobuf

您也可以參考計算叢集可用的 protobuf 描述元檔案。 請確定您有適當的許可權可讀取檔案,視檔案的位置而定。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Protobuf 函式中支持的選項

Protobuf 函式支援下列選項。

  • mode:決定還原串行化 Protobuf 記錄時的錯誤處理方式。 錯誤可能是由各種類型的格式不正確的記錄所造成,包括記錄的實際架構與 中 from_protobuf()提供的預期架構不符。
      • FAILFAST(預設值):遇到格式不正確的記錄且工作失敗時,就會擲回錯誤。
      • PERMISSIVE:格式不正確的記錄會傳回 NULL。 請小心使用此選項,因為它可能會導致卸除許多記錄。 當來源中的一小部分記錄不正確時,這會很有用。
  • recursive.fields.max.depth:新增遞歸字段的支援。 Spark SQL 架構不支援遞歸欄位。 如果未指定此選項,則不允許遞歸欄位。 為了支援 Protobufs 中的遞歸字段,它們必須擴充至指定的深度。
    • 值:

      • -1 (預設值):不允許遞歸欄位。

      • 0:遞歸欄位已卸除。

      • 1:允許單一層級的遞歸。

      • [2-10]:指定多個遞歸的臨界值,最多 10 個層級。

        將值設定為大於0,可將巢狀欄位展開至設定的深度,以允許遞歸欄位。 不允許大於 10 的值,以避免不小心建立非常大的架構。 如果 Protobuf 訊息的深度超過設定的限制,則會在遞歸限制之後截斷傳回的 Spark 結構。

    • 範例:請考慮具有下列遞歸字段的 Protobuf:

      message Person { string name = 1; Person friend = 2; }
      

      下列列出此設定具有不同值的結束架構:

      • 選項設定為 1: STRUCT<name: STRING>
      • 選項設定為 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • 選項設定為 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json:此選項會啟用將 Protobuf Any 字段轉換成 JSON。 應謹慎啟用此功能。 JSON 轉換和處理效率不佳。 此外,JSON 字串字段會遺失 Protobuf 架構安全性,讓下游處理容易發生錯誤。
    • 值:

      • False (預設值):在運行時間,這類通配符欄位可以包含任意 Protobuf 訊息做為二進位數據。 根據預設,這類欄位會像一般 Protobuf 訊息一樣處理。 它有兩個具有架構 (STRUCT<type_url: STRING, value: BINARY>)的欄位。 根據預設,二進位 value 欄位元元不會以任何方式解譯。 但是,在實務上,二進位數據可能無法在某些應用程式中運作。
      • True:將此值設定為 True 可讓欄位在運行時間將欄位轉換成 Any JSON 字串。 使用此選項時,會剖析二進位檔,並將 Protobuf 訊息還原串行化為 JSON 字串。
    • 範例:請考慮兩個定義如下的 Protobuf 類型:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      啟用此選項後,的 from_protobuf("col", messageName ="ProtoWithAny") 架構會是: STRUCT<event_name: STRING, details: STRING>

      在運行時間,如果 details 字段包含 Person Protobuf 訊息,傳回的值看起來會像這樣: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')

    • 需求:

      • 傳遞至 from_protobuf()之 Protobuf 描述元檔案中,應該提供欄位內Any所有可能 Protobuf 型別的定義。
      • 如果 Any 找不到 Protobuf,則會產生該記錄的錯誤。
      • 架構登錄目前不支援此功能。
  • emit.default.values:將 Protobuf 還原串行化為Spark結構時,啟用具有零值的轉譯欄位。 此選項應該謹慎使用。 通常不建議依賴語意中的這類更精細的差異。
    • 數值

      • False (預設值):當串行化 Protobuf 中的欄位是空的時,Spark 結構中產生的欄位預設為 null。 不啟用此選項並視為 null 預設值會比較簡單。
      • True:啟用此選項時,這類字段會填入對應的預設值。
    • 範例:請考慮下列 Protobuf,並建構 Protobuf,如下所示 Person(age=0, middle_name="")

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • 如果此選項設定為 False,呼叫 from_protobuf() 后的 Spark 結構會是所有 Null: {"name": null, "age": null, "middle_name": "", "salary": null}。 雖然已設定兩個字段 (agemiddle_name) 的值,但 Protobuf 不會將它們包含在電線格式中,因為它們是預設值。
      • 此選項設定為 True 時,呼叫 from_protobuf() 後的 Spark 結構會是: {"name": "", "age": 0, "middle_name": "", "salary": null}。 欄位 salary 會維持 null,因為它已明確宣告 optional ,而且不會在輸入記錄中設定。
  • enums.as.ints:啟用時,Protobuf 中的列舉欄位會轉譯為Spark中的整數位段。
    • 數值

      • False (預設值)
      • True:啟用時,Protobuf 中的列舉欄位會轉譯為 Spark 中的整數位段。
    • 範例:請考慮下列 Protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      指定 Protobuf 訊息, 例如 Person(job = ENGINEER)

      • 停用這個選項時,對應的 Spark 結構會是 {"job": "ENGINEER"}
      • 開啟這個選項後,對應的 Spark 結構會是 {"job": 1}

      請注意,這些欄位的架構在每個案例中都不同(整數而非預設字串)。 這類變更可能會影響下游數據表的架構。

架構登錄選項

搭配 Protobuf 函式使用架構登錄時,下列架構登錄選項是相關的。

  • schema.registry.subject
    • 必要
    • 指定架構登錄中架構的主體,例如 “client-event”
  • schema.registry.address
    • 必要
    • 架構登錄的 URL,例如 https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • 選擇性
    • 預設值:<NONE>
    • 主體的架構登錄專案可以包含多個 Protobuf 定義,就像單 proto 一檔案一樣。 未指定此選項時,第一個 Protobuf 會用於架構。 當 Protobuf 訊息不是專案中的第一個訊息時,請指定它的名稱。 例如,假設有兩個 Protobuf 定義的專案:“Person” 和 “Location” 的順序。 如果數據流對應至 “Location” 而不是 “Person”,請將此選項設定為 “Location” (或其完整名稱,包括套件 “com.example.protos.Location” )。
  • schema.registry.schema.evolution.mode
    • 默認值:「restart」。。
    • 支援的模式:
      • “restart”
      • “none”
    • 此選項會設定的 from_protobuf()架構演進模式。 在查詢開始時,Spark 會記錄指定主體的最新架構標識碼。 這會決定的 from_protobuf()架構。 查詢啟動之後,新的架構可能會發佈至架構登錄。 當傳入記錄中注意到較新的schema-id時,表示架構的變更。 這個選項會決定如何處理架構的這類變更:
      • restart (預設值):當注意到較新的架構識別碼時,觸發 UnknownFieldException 。 這會終止查詢。 Databricks 建議設定作業以在查詢失敗時重新啟動,以挑選架構變更。
      • none:忽略架構標識碼變更。 具有較新架構標識碼的記錄會剖析為查詢開頭觀察到的相同架構。 較新的 Protobuf 定義必須是回溯相容,而且會忽略新的欄位。
  • confluent.schema.registry。<schema-registy-client-option>
    • 選擇性
    • Schema-registry 會使用 Confluent Schema Registry 用戶端連線到 Confluent schema-registry。 用戶端支援的任何組態選項都可以使用前置詞 「confluent.schema.registry」 來指定。 例如,下列兩個設定提供「USER_INFO」驗證認證:
      • “confluent.schema.registry.basic.auth.credentials.source”: 'USER_INFO'
      • “confluent.schema.registry.basic.auth.user.info”: “<KEY><SECRET>