Läs- och skrivprotokollbuffertar

Azure Databricks har inbyggt stöd för serialisering och deserialisering mellan Apache Spark-structs och protokollbuffertar (protobuf). Protobuf-stöd implementeras som en Apache Spark DataFrame-transformerare och kan användas med strukturerad direktuppspelning eller för batchåtgärder.

Så här deserialiserar och serialiserar du protokollbuffertar

I Databricks Runtime 12.2 LTS och senare kan du använda from_protobuf och to_protobuf funktioner för att serialisera och deserialisera data. Protobuf-serialisering används ofta i strömningsarbetsbelastningar.

Den grundläggande syntaxen för protobuf-funktioner liknar läs- och skrivfunktioner. Du måste importera dessa funktioner innan du kan använda dem.

from_protobuf omvandlar en binär kolumn till en struct och to_protobuf omvandlar en structkolumn till binär. Du måste ange antingen ett schemaregister som angetts med options argumentet eller en beskrivande fil som identifierats av descFilePath argumentet.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Följande exempel illustrerar bearbetning av binära protobuf-poster med from_protobuf() och konvertering av Spark SQL-struct till binär protobuf med to_protobuf().

Använda protobuf med Confluent Schema Registry

Azure Databricks stöder användning av Confluent Schema Registry för att definiera Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Autentisera till ett externt Confluent-schemaregister

Om du vill autentisera till ett externt Confluent Schema Registry uppdaterar du dina schemaregisteralternativ så att de innehåller autentiseringsuppgifter och API-nycklar.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Använda truststore- och keystore-filer i Unity Catalog-volymer

I Databricks Runtime 14.3 LTS och senare kan du använda säkerhetsarkiv- och nyckelarkivfiler i Unity Catalog-volymer för att autentisera till ett Confluent-schemaregister. Uppdatera schemaregisteralternativen enligt följande exempel:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Använda Protobuf med en beskrivningsfil

Du kan också referera till en protobuf-beskrivningsfil som är tillgänglig för ditt beräkningskluster. Kontrollera att du har rätt behörighet att läsa filen, beroende på var den finns.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Alternativ som stöds i Protobuf-funktioner

Följande alternativ stöds i Protobuf-funktioner.

  • läge: Avgör hur fel vid deserialisering av Protobuf-poster hanteras. Felen kan orsakas av olika typer av felaktiga poster, inklusive ett matchningsfel mellan postens faktiska schema och det förväntade schemat som anges i from_protobuf().
    • Värden:
      • FAILFAST(standard): Ett fel utlöses när en felaktig post påträffas och aktiviteten misslyckas.
      • PERMISSIVE: En NULL returneras för felaktiga poster. Använd det här alternativet noggrant eftersom det kan leda till att många poster tappas. Detta är användbart när en liten del av posterna i källan är felaktiga.
  • recursive.fields.max.depth: Lägger till stöd för rekursiva fält. Spark SQL-scheman stöder inte rekursiva fält. När det här alternativet inte har angetts tillåts inte rekursiva fält. För att stödja rekursiva fält i Protobufs måste de expandera till ett angivet djup.
    • Värden:

      • -1 (standard): Rekursiva fält tillåts inte.

      • 0: Rekursiva fält tas bort.

      • 1: Tillåter en enda rekursionsnivå.

      • [2–10]: Ange ett tröskelvärde för multipel rekursion, upp till 10 nivåer.

        Om du anger ett värde till större än 0 kan rekursiva fält expanderas de kapslade fälten till det konfigurerade djupet. Värden som är större än 10 tillåts inte för att undvika att oavsiktligt skapa mycket stora scheman. Om ett Protobuf-meddelande har djup utöver den konfigurerade gränsen trunkeras Spark-structen som returneras efter rekursionsgränsen.

    • Exempel: Överväg en Protobuf med följande rekursiva fält:

      message Person { string name = 1; Person friend = 2; }
      

      Följande visar slutschemat med olika värden för den här inställningen:

      • Alternativet är inställt på 1: STRUCT<name: STRING>
      • Alternativet är inställt på 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Alternativet är inställt på 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Med det här alternativet kan du konvertera Protobuf Alla fält till JSON. Den här funktionen bör aktiveras noggrant. JSON-konvertering och -bearbetning är ineffektiva. Dessutom förlorar JSON-strängfältet Protobuf-schemasäkerheten, vilket gör nedströmsbearbetningen benägen för fel.
    • Värden:

      • Falskt (standard): Vid körning kan sådana jokerteckenfält innehålla godtyckliga Protobuf-meddelanden som binära data. Som standard hanteras sådana fält som ett vanligt Protobuf-meddelande. Den har två fält med schemat (STRUCT<type_url: STRING, value: BINARY>). Som standard tolkas inte det binära value fältet på något sätt. Men binära data kanske inte är praktiska i praktiken för att fungera i vissa program.
      • Sant: Om du ställer in det här värdet på Sant kan du konvertera Any fält till JSON-strängar vid körning. Med det här alternativet parsas binärfilen och Protobuf-meddelandet deserialiseras till en JSON-sträng.
    • Exempel: Överväg två Protobuf-typer som definierats på följande sätt:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Med det här alternativet aktiverat skulle schemat för from_protobuf("col", messageName ="ProtoWithAny") vara: STRUCT<event_name: STRING, details: STRING>.

      Vid körning, om details fältet innehåller Person Protobuf-meddelande, ser det returnerade värdet ut så här: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Krav:

      • Definitionerna för alla möjliga Protobuf-typer som används i Any fält bör vara tillgängliga i protobuf-beskrivningsfilen som skickas till from_protobuf().
      • Om Any Protobuf inte hittas resulterar det i ett fel för posten.
      • Den här funktionen stöds för närvarande inte med schema-registry.
  • emit.default.values: Aktiverar återgivningsfält med noll värden när protobuf deserialiseras till en Spark-struct. Det här alternativet bör användas sparsamt. Det är vanligtvis inte tillrådligt att vara beroende av sådana finare skillnader i semantik.
    • Värden

      • Falskt (standard): När ett fält är tomt i den serialiserade Protobuf är det resulterande fältet i Spark-structen som standard null. Det är enklare att inte aktivera det här alternativet och behandla null som standardvärde.
      • Sant: När det här alternativet är aktiverat fylls sådana fält med motsvarande standardvärden.
    • Exempel: Tänk på följande Protobuf med Protobuf konstruerad som Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Med det här alternativet inställt på False blir Spark-structen efter anropet from_protobuf() alla nulls: {"name": null, "age": null, "middle_name": "", "salary": null}. Även om två fält (age och middle_name) hade värden angivna, inkluderar Protobuf dem inte i trådformat eftersom de är standardvärden.
      • Med det här alternativet inställt på Sant blir Spark-structen efter anropet from_protobuf() : {"name": "", "age": 0, "middle_name": "", "salary": null}. Fältet salary förblir null eftersom det uttryckligen deklareras optional och inte anges i indataposten.
  • enums.as.ints: När det är aktiverat återges uppräkningsfält i Protobuf som heltalsfält i Spark.
    • Värden

      • False (standard)
      • Sant: När det är aktiverat återges uppräkningsfält i Protobuf som heltalsfält i Spark.
    • Exempel: Överväg följande Protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Ges ett Protobuf-meddelande som Person(job = ENGINEER):

      • Med det här alternativet inaktiverat skulle motsvarande Spark-struct vara {"job": "ENGINEER"}.
      • Med det här alternativet aktiverat skulle motsvarande Spark-struct vara {"job": 1}.

      Observera att schemat för dessa fält skiljer sig åt i varje enskilt fall (heltal i stället för standardsträng). En sådan ändring kan påverka schemat för underordnade tabeller.

Schemaregisteralternativ

Följande schemaregisteralternativ är relevanta när du använder schemaregister med Protobuf-funktioner.

  • schema.registry.subject
    • Obligatoriskt
    • Anger ämne för schema i schemaregistret, till exempel "klienthändelse"
  • schema.registry.address
    • Obligatoriskt
    • URL för schemaregister, till exempel https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Valfritt
    • Standard: <NONE>.
    • En schemaregisterpost för ett ämne kan innehålla flera Protobuf-definitioner, precis som en enda proto fil. När det här alternativet inte har angetts används den första Protobuf för schemat. Ange namnet på Protobuf-meddelandet när det inte är det första i posten. Överväg till exempel en post med två Protobuf-definitioner: "Person" och "Plats" i den ordningen. Om strömmen motsvarar "Plats" i stället för "Person" anger du det här alternativet till "Plats" (eller dess fullständiga namn, inklusive paketet "com.example.protos.Location").
  • schema.registry.schema.evolution.mode
    • Standard: "restart".
    • Lägen som stöds:
      • "starta om"
      • "ingen"
    • Det här alternativet anger schema-evolution-läge för from_protobuf(). I början av en fråga registrerar Spark det senaste schema-ID:t för det angivna ämnet. Detta avgör schemat för from_protobuf(). Ett nytt schema kan publiceras i schemaregistret när frågan har startats. När ett nyare schema-ID visas i en inkommande post indikerar det en ändring av schemat. Det här alternativet avgör hur en sådan ändring av schemat hanteras:
      • omstart (standard): Utlöser ett UnknownFieldException när ett nyare schema-ID visas. Frågan avslutas. Databricks rekommenderar att du konfigurerar arbetsflöden för att starta om vid frågefel för att hämta schemaändringar.
      • none: Schema-id-ändringar ignoreras. Posterna med nyare schema-ID parsas med samma schema som observerades i början av frågan. Nyare Protobuf-definitioner förväntas vara bakåtkompatibla och nya fält ignoreras.
  • confluent.schema.registry.<schema-registy-client-option>
    • Valfritt
    • Schema-registry ansluter till Confluent schema-registry med hjälp av Confluent Schema Registry-klienten. Alla konfigurationsalternativ som stöds av klienten kan anges med prefixet "confluent.schema.registry". Följande två inställningar anger till exempel autentiseringsuppgifter för "USER_INFO":
      • "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"