Bagikan melalui


Buffer protokol baca dan tulis

Azure Databricks memberikan dukungan asli untuk serialisasi dan deserialisasi antara struktur Apache Spark dan buffer protokol (protobuf). Dukungan Protobuf diimplementasikan sebagai transformator Apache Spark DataFrame dan dapat digunakan dengan Streaming Terstruktur atau untuk operasi batch.

Cara mendeserialisasi dan menserialisasi buffer protokol

Dalam Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan from_protobuf fungsi dan to_protobuf untuk menserialisasikan dan mendeserialisasi data. Serialisasi Protobuf umumnya digunakan dalam beban kerja streaming.

Sintaks dasar untuk fungsi protobuf mirip untuk fungsi baca dan tulis. Anda harus mengimpor fungsi ini sebelum menggunakan.

from_protobuf melemparkan kolom biner ke struct, dan to_protobuf mentransmisikan kolom struct ke biner. Anda harus menyediakan registri skema yang ditentukan dengan options argumen atau file deskriptor yang diidentifikasi oleh descFilePath argumen.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Contoh berikut mengilustrasikan pemrosesan rekaman protobuf biner dengan from_protobuf() dan mengonversi struct Spark SQL ke protobuf biner dengan to_protobuf().

Menggunakan protobuf dengan Confluent Schema Registry

Azure Databricks mendukung penggunaan Confluent Schema Registry untuk menentukan Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Mengautentikasi ke Registri Skema Confluent eksternal

Untuk mengautentikasi ke Registri Skema Confluent eksternal, perbarui opsi registri skema Anda untuk menyertakan kredensial autentikasi dan kunci API.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Menggunakan file truststore dan keystore dalam volume Katalog Unity

Di Databricks Runtime 14.3 LTS ke atas, Anda dapat menggunakan file truststore dan keystore dalam volume Unity Catalog untuk mengautentikasi ke Confluent Schema Registry. Perbarui opsi registri skema Anda sesuai dengan contoh berikut:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Menggunakan Protobuf dengan file deskriptor

Anda juga dapat mereferensikan file deskriptor protobuf yang tersedia untuk kluster komputasi Anda. Pastikan Anda memiliki izin yang tepat untuk membaca file, bergantung pada lokasinya.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Opsi yang didukung dalam fungsi Protobuf

Opsi berikut didukung dalam fungsi Protobuf.

  • mode: Menentukan bagaimana kesalahan saat mendeserialisasi rekaman Protobuf ditangani. Kesalahan mungkin disebabkan oleh berbagai jenis rekaman cacat termasuk ketidakcocokan antara skema aktual rekaman dan skema yang diharapkan yang disediakan dalam from_protobuf().
    • Nilai:
      • FAILFAST(default): Kesalahan dilemparkan ketika rekaman cacat ditemui dan tugas gagal.
      • PERMISSIVE: NULL dikembalikan untuk rekaman cacat. Gunakan opsi ini dengan hati-hati karena dapat mengakibatkan penurunan banyak rekaman. Ini berguna ketika sebagian kecil rekaman di sumber salah.
  • recursive.fields.max.depth: Menambahkan dukungan untuk bidang rekursif. Skema Spark SQL tidak mendukung bidang rekursif. Ketika opsi ini tidak ditentukan, bidang rekursif tidak diizinkan. Untuk mendukung bidang rekursif di Protobufs, bidang tersebut perlu diperluas ke kedalaman tertentu.
    • Nilai:

      • -1 (default): Bidang rekursif tidak diperbolehkan.

      • 0: Bidang rekursif dihilangkan.

      • 1: Memungkinkan satu tingkat rekursi.

      • [2-10]: Tentukan ambang batas untuk beberapa rekursi, hingga 10 tingkat.

        Mengatur nilai ke lebih dari 0 memungkinkan bidang rekursif dengan memperluas bidang berlapis ke kedalaman yang dikonfigurasi. Nilai yang lebih besar dari 10 tidak diizinkan untuk menghindari pembuatan skema yang sangat besar secara tidak sengaja. Jika pesan Protobuf memiliki kedalaman di luar batas yang dikonfigurasi, struct Spark yang dikembalikan dipotong setelah batas rekursi.

    • Contoh: Pertimbangkan Protobuf dengan bidang rekursif berikut:

      message Person { string name = 1; Person friend = 2; }
      

      Berikut ini mencantumkan skema akhir dengan nilai yang berbeda untuk pengaturan ini:

      • Opsi diatur ke 1: STRUCT<name: STRING>
      • Opsi diatur ke 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Opsi diatur ke 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Opsi ini memungkinkan konversi bidang Protobuf Any ke JSON. Fitur ini harus diaktifkan dengan hati-hati. Konversi dan pemrosesan JSON tidak efisien. Selain itu, bidang string JSON kehilangan keamanan skema Protobuf yang membuat pemrosesan hilir rentan terhadap kesalahan.
    • Nilai:

      • False (default): Saat runtime, bidang wildcard tersebut dapat berisi pesan Protobuf arbitrer sebagai data biner. Secara default, bidang tersebut ditangani seperti pesan Protobuf normal. Ini memiliki dua bidang dengan skema (STRUCT<type_url: STRING, value: BINARY>). Secara default, bidang biner value tidak ditafsirkan dengan cara apa pun. Tetapi data biner mungkin tidak nyaman dalam praktiknya untuk bekerja di beberapa aplikasi.
      • True: Mengatur nilai ini ke True memungkinkan konversi Any bidang ke string JSON pada runtime. Dengan opsi ini, biner diurai dan pesan Protobuf dideserialisasi ke dalam string JSON.
    • Contoh: Pertimbangkan dua jenis Protobuf yang didefinisikan sebagai berikut:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Dengan opsi ini diaktifkan, skema untuk from_protobuf("col", messageName ="ProtoWithAny") adalah: STRUCT<event_name: STRING, details: STRING>.

      Pada run time, jika details bidang berisi Person pesan Protobuf, nilai yang dikembalikan terlihat seperti ini: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Persyaratan:

      • Definisi untuk semua kemungkinan jenis Protobuf yang digunakan dalam Any bidang harus tersedia dalam file deskriptor Protobuf yang diteruskan ke from_protobuf().
      • Jika Any Protobuf tidak ditemukan, protobuf akan mengakibatkan kesalahan untuk rekaman tersebut.
      • Fitur ini saat ini tidak didukung dengan skema-registri.
  • emit.default.values: Mengaktifkan bidang penyajian dengan nilai nol saat mendeserialisasi Protobuf ke struct Spark. Opsi ini harus digunakan dengan hemat. Biasanya tidak disarankan untuk bergantung pada perbedaan semantik yang lebih halus.
    • Nilai

      • False (default): Saat bidang kosong di Protobuf yang diserialisasikan, bidang yang dihasilkan dalam struktur Spark adalah secara default null. Lebih mudah untuk tidak mengaktifkan opsi ini dan memperlakukan null sebagai nilai default.
      • True: Saat opsi ini diaktifkan, bidang tersebut diisi dengan nilai default yang sesuai.
    • Contoh: Pertimbangkan Protobuf berikut dengan Protobuf yang dibangun seperti Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Dengan opsi ini diatur ke False, struct Spark setelah panggilan from_protobuf() akan menjadi semua null: {"name": null, "age": null, "middle_name": "", "salary": null}. Meskipun dua bidang (age dan middle_name) memiliki nilai yang ditetapkan, Protobuf tidak menyertakannya dalam format kawat karena merupakan nilai default.
      • Dengan opsi ini diatur ke True, struct Spark setelah panggilan from_protobuf() adalah: {"name": "", "age": 0, "middle_name": "", "salary": null}. Bidang salary tetap null karena dideklarasikan secara eksplisit optional dan tidak diatur dalam catatan input.
  • enums.as.ints: Saat diaktifkan, bidang enum di Protobuf dirender sebagai bidang bilangan bulat di Spark.
    • Nilai

      • False (default)
      • True: Saat diaktifkan, bidang enum di Protobuf dirender sebagai bidang bilangan bulat di Spark.
    • Contoh: Pertimbangkan Protobuf berikut:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Mengingat pesan Protobuf seperti Person(job = ENGINEER):

      • Dengan opsi ini dinonaktifkan, struktur Spark yang sesuai adalah {"job": "ENGINEER"}.
      • Dengan opsi ini diaktifkan, struktur Spark yang sesuai adalah {"job": 1}.

      Perhatikan bahwa skema untuk bidang ini berbeda dalam setiap kasus (bilangan bulat daripada string default). Perubahan seperti itu dapat memengaruhi skema tabel hilir.

Opsi Registri Skema

Opsi registri skema berikut relevan saat menggunakan registri skema dengan fungsi Protobuf.

  • schema.registry.subject
    • Wajib
    • Menentukan subjek untuk skema di Schema Registry, seperti "client-event"
  • schema.registry.address
    • Wajib
    • URL untuk registri skema, seperti https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Opsional
    • Default: <NONE>.
    • Entri skema-registri untuk subjek dapat berisi beberapa definisi Protobuf, sama seperti satu proto file. Ketika opsi ini tidak ditentukan, Protobuf pertama digunakan untuk skema. Tentukan nama pesan Protobuf ketika bukan yang pertama dalam entri. Misalnya, pertimbangkan entri dengan dua definisi Protobuf: "Orang" dan "Lokasi" dalam urutan tersebut. Jika aliran sesuai dengan "Lokasi" daripada "Orang", atur opsi ini ke "Lokasi" (atau nama lengkapnya termasuk paket "com.example.protos.Location").
  • schema.registry.schema.evolution.mode
    • Default: "hidupkan ulang".
    • Mode yang didukung:
      • "hidupkan ulang"
      • "tidak ada"
    • Opsi ini mengatur mode skema-evolusi untuk from_protobuf(). Di awal kueri, Spark merekam skema-id terbaru untuk subjek yang diberikan. Ini menentukan skema untuk from_protobuf(). Skema baru mungkin diterbitkan ke registri skema setelah kueri dimulai. Ketika skema-id yang lebih baru terlihat dalam rekaman masuk, itu menunjukkan perubahan pada skema. Opsi ini menentukan bagaimana perubahan pada skema tersebut ditangani:
      • hidupkan ulang (default): Memicu UnknownFieldException saat skema-id yang lebih baru diperhatikan. Ini mengakhiri kueri. Databricks merekomendasikan untuk mengonfigurasi alur kerja untuk memulai ulang kegagalan kueri untuk mengambil perubahan skema.
      • none: Perubahan skema-id diabaikan. Rekaman dengan skema-id yang lebih baru diurai dengan skema yang sama yang diamati di awal kueri. Definisi Protobuf yang lebih baru diharapkan kompatibel mundur, dan bidang baru diabaikan.
  • confluent.schema.registry.<schema-registy-client-option>
    • Opsional
    • Skema-registri terhubung ke registri skema Confluent menggunakan klien Confluent Schema Registry. Opsi konfigurasi apa pun yang didukung oleh klien dapat ditentukan dengan awalan "confluent.schema.registry". Misalnya, dua pengaturan berikut memberikan kredensial autentikasi "USER_INFO":
      • "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO'
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"