プロトコル バッファーの読み取りと書き込みを行う

Azure Databricks では、Apache Spark 構造体とプロトコル バッファー (protobuf) 間のシリアル化と逆シリアル化がネイティブにサポートされています。 protobuf のサポートは Apache Spark DataFrame トランスフォーマーとして実装されており、構造化ストリーミングで、またはバッチ操作に使用できます。

プロトコル バッファーを逆シリアル化およびシリアル化する方法

Databricks Runtime 12.2 LTS 以降では、from_protobuf および to_protobuf 関数を使用してデータをシリアル化および逆シリアル化できます。 protobuf シリアル化は、ストリーミング ワークロードでよく使用されます。

protobuf 関数の基本的な構文は、読み取りと書き込みの関数に似ています。 これらの関数は、使用前にインポートする必要があります。

from_protobuf は、バイナリ列を構造体にキャストし、to_protobuf は構造体列をバイナリにキャストします。 options 引数で指定されるスキーマ レジストリ、または descFilePath 引数で特定される記述子ファイルのいずれかを指定する必要があります。

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

次の例は、from_protobuf() を使用してバイナリ protobuf レコードを処理し、to_protobuf() を使用 して Spark SQL 構造体をバイナリ protobuf に変換する方法を示しています。

Confluent スキーマ レジストリで protobuf を使用する

Azure Databricks では、Confluen スキーマ レジストリを使用して Protobuf を定義できます。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

外部 Confluent スキーマ レジストリに対して認証する

外部 Confluent スキーマ レジストリに対して認証するには、スキーマ レジストリ オプションを更新して、認証資格情報と API キーを含めます。

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Unity Catalog ボリュームでトラストストアおよびキーストア ファイルを使用する

Databricks Runtime 14.3 LTS 以降では、Unity Catalog ボリュームのトラストストアおよびキーストア ファイルを使用して、Confluent スキーマ レジストリに対する認証を行うことができます。 次の例に従って、スキーマ レジストリ オプションを更新します。

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

記述子ファイルで Protobuf を使用する

コンピューティング クラスターで使用できる protobuf 記述子ファイルを参照することもできます。 その場所に応じた、ファイルを読み取る適切なアクセス許可があることを確認してください。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Protobuf 関数でサポートされているオプション

Protobuf 関数では、次のオプションがサポートされます。

  • mode: Protobuf レコードの逆シリアル化中のエラーの処理方法を決定します。 エラーは、レコードの実際のスキーマと、from_protobuf() で提供される予想スキーマとの不一致など、形式に誤りがあるさまざまな種類のレコードによって発生する可能性があります。
    • :
      • FAILFAST (既定値): 形式に誤りがあるレコードが検出され、タスクが失敗すると、エラーがスローされます。
      • PERMISSIVE: 形式に誤りがあるレコードに対して NULL 値が返されます。 多くのレコードが削除される可能性があるため、このオプションは慎重に使用してください。 これは、誤りがあるレコードが、ソース内のごく一部である場合に便利です。
  • recursive.fields.max.depth: 再帰フィールドに対するサポートを追加します。 Spark SQL スキーマでは、再帰フィールドはサポートされていません。 このオプションを指定しないと、再帰フィールドは許可されません。 Protobufs で再帰フィールドをサポートするには、指定された深さまで展開する必要があります。
    • [値]:

      • -1 (既定値): 再帰フィールドは許可されません。

      • 0: 再帰フィールドは削除されます。

      • 1: 単一レベルの再帰が許可されます。

      • [2-10]: 複数の再帰のしきい値 (最大 10 レベル) を指定します。

        値を 0 より大きく設定すると、入れ子になったフィールドを構成された深さまで展開することで、再帰フィールドを使用できるようになります。 10 より大きい値は、誤って非常に大きなスキーマが作成されるのを回避するためには使用できません。 Protobuf メッセージの深さが構成された制限を超えている場合、返される Spark 構造体は再帰制限を超えると切り詰められます。

    • : 次の再帰フィールドを持つ Protobuf について考えてみましょう。

      message Person { string name = 1; Person friend = 2; }
      

      この設定でさまざまな値の終了スキーマを次に示します。

      • オプションを 1 に設定: STRUCT<name: STRING>
      • オプションを 2 に設定: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • オプションを 3 に設定: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: このオプションを使用すると、Protobuf の Any フィールドを JSON に変換できます。 この機能は慎重に有効にする必要があります。 JSON の変換と処理は非効率的です。 さらに、JSON 文字列フィールドでは Protobuf スキーマの安全性が失われるので、ダウンストリーム処理でエラーが発生しやすくなります。
    • [値]:

      • False (既定値): 実行時に、このようなワイルドカード フィールドに任意の Protobuf メッセージをバイナリ データとして含めることができます。 既定では、このようなフィールドは通常の Protobuf メッセージのように処理されます。 これには、スキーマ (STRUCT<type_url: STRING, value: BINARY>) を含む 2 つのフィールドがあります。 既定では、バイナリの value フィールドは何も解釈されません。 ただし、バイナリ データは、実際に一部のアプリケーションで動作するのに便利ではない場合があります。
      • True: この値を True に設定すると、実行時に Any フィールドを JSON 文字列に変換できます。 このオプションを使用すると、バイナリが解析され、Protobuf メッセージが JSON 文字列に逆シリアル化されます。
    • : 次のように定義されている 2 つの Protobuf 型を考えてみましょう。

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      このオプションを有効にすると、from_protobuf("col", messageName ="ProtoWithAny") のスキーマは STRUCT<event_name: STRING, details: STRING> になります。

      実行時に、details フィールドに Person Protobuf メッセージが含まれている場合、戻り値は ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}') のようになります。

    • 要件:

      • Any フィールドで使用されるすべての Protobuf 型の定義が、from_protobuf() に渡される Protobuf 記述子ファイルで使用できる必要があります。
      • Any の Protobuf が見つからない場合、そのレコードに対してエラーが発生します。
      • 現在、この機能はスキーマ レジストリではサポートされていません。
  • emit.default.values: Protobuf を Spark 構造体に逆シリアル化するときに、0 の値を持つフィールドをレンダリングできます。 このオプションは慎重に使用してください。 通常、セマンティクスのこのような細かい違いに依存することはお勧めしません。
      • False (既定値): シリアル化された Protobuf 内のフィールドが空の場合、Spark 構造体の結果のフィールドは既定では null 値になります。 このオプションを有効にせず、null を既定値として扱う方が簡単です。
      • True: このオプションを有効にすると、該当するフィールドに対応する既定値が入力されます。
    • : Person(age=0, middle_name="") のように Protobuf が構築された、次の Protobuf を考えてみましょう。

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • このオプションを False に設定すると、from_protobuf() を呼び出した後の Spark 構造体は、次のようにすべて null 値になります: {"name": null, "age": null, "middle_name": "", "salary": null}。 2 つのフィールド (agemiddle_name) に値が設定されていても、それらは既定値であるため、ワイヤ形式の Protobuf には含まれません。
      • このオプションを True に設定すると、from_protobuf() を呼び出した後の Spark 構造体は、次のようになります: {"name": "", "age": 0, "middle_name": "", "salary": null}salary フィールドは、明示的に optional を宣言しており、入力レコードに設定されていないため、null 値のままです。
  • enums.as.ints: 有効にすると、Protobuf の列挙型フィールドは Spark の整数フィールドとしてレンダリングされます。
      • False (既定値)
      • True: 有効にすると、Protobuf の列挙型フィールドは Spark の整数フィールドとしてレンダリングされます。
    • : 次のような Protobuf を考えてみましょう。

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Person(job = ENGINEER) のような Protobuf メッセージが指定されています。

      • このオプションを無効にすると、対応する Spark 構造体は {"job": "ENGINEER"} になります。
      • このオプションを有効にすると、対応する Spark 構造体は {"job": 1} になります。

      これらのフィールドのスキーマが、それぞれのケースで異なっている (既定の文字列ではなく整数になっている) ことに注目してください。 このような変更は、ダウンストリーム テーブルのスキーマに影響する可能性があります。

スキーマ レジストリのオプション

Protobuf 関数でスキーマ レジストリを使用するときは、次のスキーマ レジストリのオプションが関連します。

  • schema.registry.subject
    • 必須
    • “client-event” など、スキーマ レジストリ内のスキーマのサブジェクトを指定します
  • schema.registry.address
    • 必須
    • https://schema-registry.example.com:8081 などのスキーマ レジストリの URL
  • schema.registry.protobuf.name
    • 省略可能
    • 既定値: <NONE>
    • サブジェクトのスキーマ レジストリ エントリには、1 つの proto ファイルと同様に、複数の Protobuf 定義を含めることができます。 このオプションを指定しない場合、最初の Protobuf がスキーマに使用されます。 エントリの最初のメッセージでない場合は、Protobuf メッセージの名前を指定します。 たとえば、2 つの Protobuf 定義 (“Person” と “Location” の順に) を持つエントリを考えてみましょう。 ストリームが “Person” ではなく “Location” に対応する場合は、このオプションを “Location” (またはパッケージ “com.example.protos.Location” を含む完全名) に設定します。
  • schema.registry.schema.evolution.mode
    • 既定値: “restart”。
    • サポートされているモード:
      • “restart”
      • “none”
    • このオプションは、from_protobuf() のスキーマ進化モードを設定します。 クエリの開始時に、Spark は指定のサブジェクトの最新のスキーマ ID を記録します。 これにより、from_protobuf() のスキーマが決まります。 クエリの開始後に、新しいスキーマがスキーマ レジストリに発行される場合があります。 受信レコードで新しいスキーマ ID が認識された場合、それはスキーマの変更を示します。 このオプションで、スキーマに対するこのような変更の処理方法が決定されます。
      • restart (既定値): 新しいスキーマ ID が認識されたときに UnknownFieldException をトリガーします。 これにより、クエリが終了します。 Databricks では、スキーマの変更を取得するために、クエリの失敗時に再開するようにワークフローを構成することをお勧めします。
      • none: スキーマ ID の変更は無視されます。 新しいスキーマ ID のレコードは、クエリの開始時に観察されたのと同じスキーマで解析されます。 新しい Protobuf 定義は下位互換性があるものと予想され、新しいフィールドは無視されます。
  • confluent.schema.registry.<schema-registy-client-option>
    • 省略可能
    • スキーマ レジストリは、Confluent のスキーマ レジストリ クライアントを使用して Confluent のスキーマ レジストリに接続します。 クライアントでサポートされている構成オプションは、プレフィックス “confluent.schema.registry” で指定できます。 たとえば、次の 2 つの設定では、“USER_INFO” 認証資格情報が提供されます。
      • “confluent.schema.registry.basic.auth.credentials.source”: ‘USER_INFO’
      • “confluent.schema.registry.basic.auth.user.info”: “<KEY> : <SECRET>