Olvasási és írási protokollpufferek

Az Azure Databricks natív támogatást nyújt az Apache Spark-szerkezetek és a protokollpufferek (protobuf) közötti szerializáláshoz és deszerializáláshoz. A Protobuf-támogatás Apache Spark DataFrame-transzformátorként van implementálva, és strukturált streameléssel vagy kötegelt műveletekhez használható.

Protokollpufferek deszerializálása és szerializálása

A Databricks Runtime 12.2 LTS és újabb verziókban az adatok szerializálására és deszerializálására és deszerializálására használható from_protobuf és to_protobuf használható függvények. A Protobuf szerializálást gyakran használják streamelési számítási feladatokban.

A protobuf függvények alapszintaxisa hasonló az olvasási és írási függvényekhez. Használat előtt importálnia kell ezeket a függvényeket.

from_protobuf bináris oszlopot ad át egy szerkezetbe, és to_protobuf egy struktúraoszlopot binárisra vet. Meg kell adnia egy sémaregisztrációs adatbázist, amely az options argumentummal van megadva, vagy egy leírófájlt, amelyet az descFilePath argumentum azonosít.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Az alábbi példák a bináris protobuf rekordok from_protobuf() feldolgozását szemléltetik a Spark SQL-struktúra bináris protobuftá alakításával to_protobuf().

A protobuf használata a Confluent-sémaregisztrációs adatbázissal

Az Azure Databricks támogatja a Confluent sémaregisztrációs adatbázisának használatát a Protobuf definiálásához.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Hitelesítés külső Confluent-sémaregisztrációs adatbázisba

Ha külső Confluent-sémaregisztrációs adatbázisba szeretne hitelesítést végezni, frissítse a sémaregisztrációs beállításokat úgy, hogy azok tartalmazzák a hitelesítési hitelesítő adatokat és az API-kulcsokat.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Truststore- és keystore-fájlok használata Unity Catalog-kötetekben

A Databricks Runtime 14.3 LTS-ben és újabb verziókban a Unity Catalog-kötetekben található truststore- és keystore-fájlokat használhatja a Confluent-sémaregisztrációs adatbázisba való hitelesítéshez. Frissítse a sémaregisztrációs adatbázis beállításait az alábbi példának megfelelően:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

A Protobuf használata leíró fájllal

Hivatkozhat egy protobuf leírófájlra is, amely elérhető a számítási fürt számára. Győződjön meg arról, hogy megfelelő engedélyekkel rendelkezik a fájl olvasásához a helyétől függően.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

A Protobuf függvények támogatott beállításai

A Protobuf függvények az alábbi lehetőségeket támogatják.

  • mód: Meghatározza a Protobuf-rekordok deszerializálása során felmerülő hibák kezelését. A hibákat különböző típusú helytelen formátumú rekordok okozhatják, beleértve a rekord tényleges sémája és a várt séma from_protobuf()közötti eltérést.
    • Értékek:
      • FAILFAST(alapértelmezett): Hibásan formázott rekord esetén a rendszer hibát jelez, és a feladat meghiúsul.
      • PERMISSIVE: A rendszer null értéket ad vissza a hibásan formázott rekordokhoz. Ezt a lehetőséget körültekintően használja, mivel sok rekord elvetése is lehet. Ez akkor hasznos, ha a forrás rekordjainak egy kis része helytelen.
  • recursive.fields.max.depth: Támogatja a rekurzív mezőket. A Spark SQL-sémák nem támogatják a rekurzív mezőket. Ha ez a beállítás nincs megadva, a rekurzív mezők nem engedélyezettek. A Protobufs rekurzív mezőinek támogatásához a mezőket egy megadott mélységre kell bővíteni.
    • Értékek:

      • -1 (alapértelmezett): A rekurzív mezők nem engedélyezettek.

      • 0: A rekurzív mezők elvetve lesznek.

      • 1: Lehetővé teszi a rekurzió egyetlen szintjét.

      • [2–10]: Adjon meg egy küszöbértéket több rekurzióhoz, legfeljebb 10 szintig.

        Ha egy értéket 0-nál nagyobb értékre állít be, a rekurzív mezőket a beágyazott mezők konfigurált mélységre való kibontásával teszi lehetővé. A 10-nél nagyobb értékek nem engedélyezettek a nagyon nagy sémák véletlen létrehozása érdekében. Ha egy Protobuf-üzenet mélysége meghaladja a konfigurált korlátot, a visszaadott Spark-szerkezet csonkolva lesz a rekurziós korlát után.

    • Példa: Fontolja meg a Következő rekurzív mezővel rendelkező Protobuf-t:

      message Person { string name = 1; Person friend = 2; }
      

      Az alábbi lista a beállításhoz tartozó különböző értékekkel rendelkező záró sémát sorolja fel:

      • A beállítás értéke 1: STRUCT<name: STRING>
      • A beállítás értéke 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • A beállítás értéke 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Ez a beállítás lehetővé teszi a Protobuf Any mezők JSON-ra konvertálását. Ezt a funkciót gondosan engedélyezni kell. A JSON-átalakítás és -feldolgozás nem hatékony. Emellett a JSON-sztring mező elveszíti a Protobuf sémabiztonságot, így a lefelé irányuló feldolgozás hajlamos a hibákra.
    • Értékek:

      • Hamis (alapértelmezett): Futásidőben az ilyen helyettesítő karakterek tetszőleges Protobuf-üzeneteket tartalmazhatnak bináris adatként. Az ilyen mezők alapértelmezés szerint normál Protobuf-üzenetként vannak kezelve. Két, sémával (STRUCT<type_url: STRING, value: BINARY>)rendelkező mezővel rendelkezik. Alapértelmezés szerint a bináris value mező semmilyen módon nem értelmezhető. Előfordulhat azonban, hogy a bináris adatok a gyakorlatban nem megfelelőek egyes alkalmazásokban való munkavégzéshez.
      • Igaz: Az érték Igaz értékre állítása lehetővé teszi a mezők JSON-sztringekké való konvertálását Any futásidőben. Ezzel a beállítással a bináris elem elemzése és a Protobuf-üzenet deszerializálása JSON-sztringgé történik.
    • Példa: Fontolja meg két Protobuf-típust a következőképpen:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Ha ez a beállítás engedélyezve van, a séma from_protobuf("col", messageName ="ProtoWithAny") a következő lesz: STRUCT<event_name: STRING, details: STRING>.

      Futtatáskor, ha details a mező Protobuf-üzenetet tartalmaz Person , a visszaadott érték a következőképpen néz ki: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Követelmények:

      • A mezőkben Any használt összes lehetséges Protobuf-típus definícióinak elérhetőnek kell lenniük a protobuf leíró fájlban, amelyet a program átad from_protobuf().
      • Ha Any a Protobuf nem található, az hibát fog eredményezni a rekordban.
      • A sémaregisztrációs adatbázis jelenleg nem támogatja ezt a funkciót.
  • emit.default.values: A Protobuf Spark-szerkezetbe való deszerializálásakor engedélyezi a nulla értékű renderelési mezők megjelenítését. Ezt a lehetőséget takarékosan kell használni. Általában nem ajánlott a szemantika ilyen finomabb különbségeitől függeni.
    • Értékek

      • Hamis (alapértelmezett): Ha egy mező üres a szerializált Protobufban, a Spark-szerkezet eredményül kapott mezője alapértelmezés szerint null. Egyszerűbb, ha nem engedélyezi ezt a beállítást, és alapértelmezett értékként kezeli null .
      • Igaz: Ha ez a beállítás engedélyezve van, az ilyen mezőket a megfelelő alapértelmezett értékek töltik ki.
    • Példa: Fontolja meg a következő Protobuf-t a Protobuf felépítése alapján:Person(age=0, middle_name="")

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Ha ez a beállítás False (Hamis) értékre van állítva, a Hívás from_protobuf() utáni Spark-szerkezet minden null értékű lesz: {"name": null, "age": null, "middle_name": "", "salary": null}. Annak ellenére, hogy két mező (age és middle_name) rendelkezik értékekkel, a Protobuf nem tartalmazza őket vezetékes formátumban, mivel ezek alapértelmezett értékek.
      • Ha ez a beállítás Igaz értékre van állítva, a Hívás from_protobuf() utáni Spark-struktúra a következő lesz: {"name": "", "age": 0, "middle_name": "", "salary": null}. A salary mező null értékű marad, mivel explicit módon deklarálva optional van, és nincs beállítva a bemeneti rekordban.
  • enums.as.ints: Ha engedélyezve van, a Protobuf enum mezői egész számként jelennek meg a Sparkban.
    • Értékek

      • False (alapértelmezett)
      • Igaz: Ha engedélyezve van, a Protobuf enum mezői egész számként jelennek meg a Sparkban.
    • Példa: Fontolja meg a következő Protobufot:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Protobuf-üzenet, például Person(job = ENGINEER):

      • Ha ez a beállítás le van tiltva, a megfelelő Spark-struktúra az lenne {"job": "ENGINEER"}.
      • Ha ez a beállítás engedélyezve van, a megfelelő Spark-struktúra lesz {"job": 1}.

      Figyelje meg, hogy ezeknek a mezőknek a sémája minden esetben eltérő (az alapértelmezett sztring helyett egész szám). Ez a változás hatással lehet az alsóbb rétegbeli táblák sémájára.

Sémaregisztrációs beállításjegyzék beállításai

A sémaregisztrációs adatbázis Protobuf-függvényekkel való használata során az alábbi sémaregisztrációs beállítások relevánsak.

  • schema.registry.subject
    • Kötelező
    • A séma tárgyát adja meg a sémaregisztrációs adatbázisban, például az "ügyféleseményt"
  • schema.registry.address
    • Kötelező
    • A sémaregisztrációs adatbázis URL-címe, például https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Választható
    • Alapértelmezett: <NONE>.
    • Egy tárgy sémaregisztrációs bejegyzése több Protobuf-definíciót is tartalmazhat, csakúgy, mint egyetlen proto fájlt. Ha ez a beállítás nincs megadva, a rendszer az első Protobufot használja a sémához. Adja meg a Protobuf-üzenet nevét, ha nem ez az első a bejegyzésben. Vegyünk például egy bejegyzést két Protobuf-definícióval: "Személy" és "Hely" ebben a sorrendben. Ha a stream nem "Személy", hanem "Hely" értéknek felel meg, állítsa ezt a beállítást "Hely" értékre (vagy annak teljes nevére, beleértve a "com.example.protos.Location" csomagot is).
  • schema.registry.schema.evolution.mode
    • Alapértelmezett: "újraindítás".
    • Támogatott módok:
      • "újraindítás"
      • "nincs"
    • Ez a beállítás a sémafejlődési módot állítja be a következőhöz from_protobuf(): . A lekérdezés elején a Spark rögzíti az adott tárgy legújabb sémaazonosítóját. Ez határozza meg a sémát a következőhöz from_protobuf(): . Előfordulhat, hogy a lekérdezés elindítása után egy új séma lesz közzétéve a sémaregisztrációs adatbázisban. Ha egy újabb sémaazonosítót észlel egy bejövő rekordban, az a séma módosítását jelzi. Ez a beállítás határozza meg a séma ilyen módosításának kezelését:
      • újraindítás (alapértelmezett): Egy újabb sémaazonosító észlelésekor aktiválja UnknownFieldException az eseményt. Ezzel leállítja a lekérdezést. A Databricks azt javasolja, hogy konfigurálja a munkafolyamatokat, hogy a sémamódosítások felvételéhez lekérdezési hiba esetén újrainduljanak.
      • nincs: A sémaazonosító módosításait a rendszer figyelmen kívül hagyja. Az újabb sémaazonosítóval rendelkező rekordok a lekérdezés elején megfigyelt sémával vannak elemezve. Az újabb Protobuf-definíciók várhatóan visszamenőlegesen kompatibilisek lesznek, és az új mezők figyelmen kívül lesznek hagyva.
  • confluent.schema.registry.<schema-registy-client-option>
    • Választható
    • A sémaregisztrációs adatbázis a Confluent sémaregisztrációs ügyféllel csatlakozik a Confluent sémaregisztrációs adatbázisához. Az ügyfél által támogatott konfigurációs beállítások a "confluent.schema.registry" előtaggal adhatók meg. A következő két beállítás például az "U Standard kiadás R_INFO" hitelesítési hitelesítő adatokat adja meg:
      • "confluent.schema.registry.basic.auth.credentials.source": 'U Standard kiadás R_INFO'
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"