Lesen und Schreiben von Protokollpuffern

Azure Databricks bietet native Unterstützung für die Serialisierung und Deserialisierung zwischen Apache Spark-Strukturen und Protokollpuffern (protobuf). Die Unterstützung von Protokollpuffern ist als Apache Spark-DataFrame-Transformator implementiert und kann mit strukturiertem Streaming oder für Batchvorgänge verwendet werden.

Deserialisieren und Serialisieren von Protokollpuffern

In Databricks Runtime 12.2 LTS und höher können Sie Funktionen vom Typ from_protobuf und to_protobuf verwenden, um Daten zu serialisieren und zu deserialisieren. Die Serialisierung von Protokollpuffern wird häufig in Streamingworkloads verwendet.

Die grundlegende Syntax für Protokollpufferfunktionen ähnelt der Syntax für Lese- und Schreibfunktionen. Diese Funktionen müssen vor der Verwendung importiert werden.

from_protobuf wandelt eine binäre Spalte in eine Strukturspalte um. to_protobuf wandelt eine Strukturspalte in eine binäre Spalte um. Sie müssen entweder eine Schemaregistrierung mit dem Argument options angeben oder eine durch das Argument descFilePath identifizierte Deskriptordatei.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Die folgenden Beispiele veranschaulichen die Verarbeitung binärer Protokollpuffer-Datensätze mit from_protobuf() und die Konvertierung der Spark SQL-Struktur in einen binären Protokollpuffer mit to_protobuf().

Verwenden von Protokollpuffern mit der Schemaregistrierung von Confluent

Azure Databricks unterstützt die Verwendung der Schemaregistrierung von Confluent zum Definieren von Protokollpuffern.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Authentifizieren bei einer externen Schemaregistrierung von Confluent

Aktualisieren Sie für die Authentifizierung bei einer externen Schemaregistrierung von Confluent Ihre Schemaregistrierungsoptionen so, dass sie Authentifizierungsanmeldeinformationen und API-Schlüssel enthalten.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Verwenden von Truststore- und Keystoredateien in Unity-Katalogvolumes

In Databricks Runtime 14.3 LTS und höher können Sie Truststore- und Keystoredateien in Unity-Katalogvolumes verwenden, um sich bei einer Confluent-Schemaregistrierung zu authentifizieren. Aktualisieren Sie die Schemaregistrierungsoptionen gemäß dem folgenden Beispiel:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Verwenden von Protokollpuffern mit einer Deskriptordatei

Sie können auch auf eine Protokollpuffer-Deskriptordatei verweisen, die für Ihren Computecluster verfügbar ist. Stellen Sie sicher, dass Sie je nach Speicherort über die richtigen Berechtigungen zum Lesen der Datei verfügen.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Unterstützte Optionen in Protobuf-Funktionen

Die folgenden Optionen werden in Protobuf-Funktionen unterstützt.

  • mode: Bestimmt, wie Fehler beim Deserialisieren von Protobuf-Datensätzen behandelt werden. Die Fehler können durch verschiedene Typen falsch formatierter Datensätze verursacht werden, einschließlich eines Konflikts zwischen dem tatsächlichen Schema des Datensatzes und dem erwarteten Schema, das in from_protobuf() bereitgestellt wird.
    • Werte:
      • FAILFAST (Standard): Ein Fehler wird ausgelöst, wenn ein nicht wohlgeformter Datensatz gefunden wird, und die Aufgabe fehlschlägt.
      • PERMISSIVE: Für nicht wohlgeformte Datensätze wird ein NULL-Wert zurückgegeben. Verwenden Sie diese Option sorgfältig, da sie dazu führen kann, dass viele Datensätze gelöscht werden. Sie ist hingegen nützlich, wenn nur ein Bruchteil der Datensätze in der Quelle falsch ist.
  • rekursive.fields.max.depth: Fügt Unterstützung für rekursive Felder hinzu. Spark SQL-Schemas unterstützen keine rekursiven Felder. Wenn für diese Option nichts angegeben ist, sind rekursive Felder nicht zulässig. Um rekursive Felder in Protobufs zu unterstützen, müssen sie auf eine angegebene Tiefe erweitert werden.
    • Werte:

      • -1 (Standard): Rekursive Felder sind nicht zulässig.

      • 0: Rekursive Felder werden gelöscht.

      • 1: Eine einzelne Rekursionsebene ist zulässig.

      • [2-10]: Gibt einen Schwellenwert für mehrere Rekursionen an (bis zu 10 Ebenen).

        Wird der Wert auf größer Null festgelegt, sind rekursive Felder zulässig, und die geschachtelten Felder werden auf die konfigurierte Tiefe erweitert. Werte größer als 10 sind nicht zulässig, das versehentliche Erstellen sehr großer Schemas zu vermeiden. Wenn eine Protobuf-Nachricht eine Tiefe aufweist, die die konfigurierte Grenze überschreitet, wird die zurückgegebene Spark-Struktur nach dem Rekursionslimit abgeschnitten.

    • Beispiel: Stellen Sie sich ein Protobuf mit dem folgenden rekursiven Feld vor:

      message Person { string name = 1; Person friend = 2; }
      

      Im Folgenden wird das Endschema mit unterschiedlichen Werten für diese Einstellung aufgeführt:

      • Option auf 1 festgelegt: STRUCT<name: STRING>
      • Option auf 2 festgelegt: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Option auf 3 festgelegt: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Mithilfe dieser Option werden Any-Fields in Protobuf in JSON konvertiert. Sie sollten diese Funktion mit Bedacht aktivieren. JSON-Konvertierung und -Verarbeitung sind ineffizient. Darüber hinaus verliert das JSON-Zeichenfolgenfeld die Sicherheit des Protobuf-Schemas, wodurch die nachgelagerte Verarbeitung fehleranfällig ist.
    • Werte:

      • False (Standard): Zur Laufzeit können solche Platzhalterfelder beliebige Protobuf-Nachrichten als Binärdaten enthalten. Standardmäßig werden solche Felder wie eine normale Protobuf-Nachricht behandelt. Sie verfügt über zwei Felder mit dem Schema(STRUCT<type_url: STRING, value: BINARY>). Standardmäßig wird das binäre value-Feld nicht interpretiert. Die Binärdaten sind jedoch in der Praxis für manche Anwendungen unpraktisch.
      • True: Durch Festlegen dieses Werts auf „True“ werden Any-Felder zur Laufzeit in JSON-Zeichenfolgen konvertiert. Bei dieser Option wird die Binärdatei geparst, und die Protobuf-Nachricht wird in eine JSON-Zeichenfolge deserialisiert.
    • Beispiel: Stellen Sie sich zwei Protobuf-Typen, die folgendermaßen definiert sind:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Wenn diese Option aktiviert ist, wäre das Schema für from_protobuf("col", messageName ="ProtoWithAny"): STRUCT<event_name: STRING, details: STRING>.

      Wenn das details-Feld zur Laufzeit die Protobuf-Nachricht Person enthält, sieht der zurückgegebene Wert folgendermaßen aus: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Anforderungen:

      • Die Definitionen für alle möglichen Protobuf-Typen, die in Any-Feldern verwendet werden, sollten in der Protobuf-Beschreibungsdatei verfügbar sein, die an from_protobuf() übergeben wird.
      • Wenn Any-Protobuf nicht gefunden wird, führt das für diesen Datensatz zu einem Fehler.
      • Dieses Feature wird derzeit in der Schemaregistrierung nicht unterstützt.
  • emit.default.values: Ermöglicht das Rendern von Feldern mit Nullwerten beim Deserialisieren von Protobuf in eine Spark-Struktur. Diese Option sollte nur selten zum Einsatz kommen. Es ist in der Regel nicht ratsam, von solchen feineren Unterschieden in der Semantik abhängig zu sein.
    • Werte

      • False (Standard): Wenn ein Feld im serialisierten Protobuf leer ist, ist das resultierende Feld in der Spark-Struktur standardmäßig Null. Es ist einfacher, diese Option nicht zu aktivieren und null als Standardwert zu behandeln.
      • True: Wenn diese Option aktiviert ist, werden solche Felder mit entsprechenden Standardwerten gefüllt.
    • Beispiel: Betrachten Sie den folgenden Protobuf, ähnlich wie Person(age=0, middle_name="") konstruiert wurde:

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Wenn diese Option auf „False“ festgelegt ist, besteht die Spark-Struktur nach dem Aufrufen von from_protobuf() nur aus Nullwerten: {"name": null, "age": null, "middle_name": "", "salary": null}. Obwohl für zwei Felder (age und middle_name) Werte festgelegt wurden, enthält Protobuf sie nicht im Wire-Format, da sie Standardwerte sind.
      • Wenn diese Option auf „True“ festgelegt ist, würde die Spark-Struktur nach dem Aufrufen von from_protobuf() folgendermaßen aussehen: {"name": "", "age": 0, "middle_name": "", "salary": null}. Das Feld salary bleibt behält den Wert Null, da es explizit als optional deklariert und nicht im Eingabedatensatz festgelegt wird.
  • enumerations.as.ints: Wenn diese Option aktiviert ist, werden Enumerationsfelder in Protobuf in Spark als ganzzahlige Felder gerendert.
    • Werte

      • False (Standardwert)
      • True: Wenn diese Option aktiviert ist, werden Enumerationsfelder in Protobuf in Spark als ganzzahlige Felder gerendert.
    • Beispiel: Sehen Sie sich den folgenden Protobuf an:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Angenommen eine Protobuf-Nachricht würde Person(job = ENGINEER) lauten:

      • Wäre diese Option deaktiviert, wäre die entsprechende Spark-Struktur {"job": "ENGINEER"}.
      • Wäre diese Option aktiviert, wäre die entsprechende Spark-Struktur {"job": 1}.

      Beachten Sie, dass das Schema für diese Felder in jedem Fall unterschiedlich ist (ganze Zahl statt Standardzeichenfolge). Eine solche Änderung kann sich auf das Schema der nachgelagerten Tabellen auswirken.

Optionen für die Schemaregistrierung

Die folgenden Optionen für die Schemaregistrierung sind relevant, wenn Sie die Schemaregistrierung mit Protobuf-Funktionen verwenden.

  • schema.registry.subject
    • Erforderlich
    • Gibt den Betreff für das Schema in der Schemaregistrierung an, z. B. „client-event“.
  • schema.registry.address
    • Erforderlich
    • Die URL für die Schemaregistrierung, z. B. https://schema-registry.example.com:8081.
  • schema.registry.protobuf.name
    • Optional
    • Standardwert: <NONE>.
    • Ein Schemaregistrierungseintrag für einen Betreff kann mehrere Protobuf-Definitionen enthalten, genau wie eine einzelne proto-Datei. Wenn für diese Option nichts angegeben ist, wird der erste Protobuf für das Schema verwendet. Geben Sie den Namen der Protobuf-Nachricht an, wenn sie nicht die erste in dem Eintrag ist. Stellen Sie sich beispielsweise einen Eintrag mit zwei Protobuf-Definitionen in folgender Reihenfolge vor: „Person“ und „Ort“. Wenn der Datenstrom „Ort“ und nicht „Person“ entspricht, legen Sie diese Option auf „Ort“ fest (oder den vollständigen Namen einschließlich des Pakets „com.example.protos.Location“).
  • schema.registry.schema.evolution.mode
    • Standard: „restart“.
    • Unterstützte Modi:
      • „restart“
      • „none“
    • Diese Option legt den Schemaentwicklungsmodus für from_protobuf() fest. Zu Beginn einer Abfrage zeichnet Spark die aktuellste Schema-ID für den entsprechenden Betreff auf. Dadurch wird das Schema für from_protobuf() bestimmt. Nach dem Starten der Abfrage wird möglicherweise ein neues Schema in der Schemaregistrierung veröffentlicht. Wenn in einem eingehenden Datensatz eine aktuellere Schema-ID erkannt wird, weist er das Schema auf eine Änderung hin. Diese Option bestimmt, wie eine solche Änderung des Schemas behandelt wird:
      • restart (Standard): Löst eine UnknownFieldException aus, wenn eine aktuellere Schema-ID erkannt wird. Dadurch wird die Abfrage beendet. Databricks empfiehlt das Konfigurieren von Workflows für den Neustart bei Abfragefehlern beim Aufnehmen von Schemaänderungen.
      • none: Schema-ID-Änderungen werden ignoriert. Die Datensätze mit aktuellerer Schema-ID werden mit demselben Schema geparst, das am Anfang der Abfrage beobachtet wurde. Aktuellere Protobuf-Definitionen werden als abwärtskompatibel erwartet, und neue Felder werden ignoriert.
  • confluent.schema.registry.<schema-registy-client-option>
    • Optional
    • Die Schemaregistrierung stellt mithilfe des Confluent-Schemaregistrierungsclients eine Verbindung zur Confluent-Schemaregistrierung her. Alle vom Client unterstützten Konfigurationsoptionen können mit dem Präfix „confluent.schema.registry“ angegeben werden. Die folgenden beiden Einstellungen stellen beispielsweise „USER_INFO“-Anmeldeinformationen für die Authentifizierung bereit:
      • „confluent.schema.registry.basic.auth.credentials.source“: ‘USER_INFO’
      • „confluent.schema.registry.basic.auth.user.info“: „<KEY> : <SECRET>