Protocolbuffers lezen en schrijven
Azure Databricks biedt systeemeigen ondersteuning voor serialisatie en deserialisatie tussen Apache Spark-structs en protocolbuffers (protobuf). Protobuf-ondersteuning wordt geïmplementeerd als een Apache Spark DataFrame-transformator en kan worden gebruikt met Structured Streaming of voor batchbewerkingen.
Protocolbuffers deserialiseren en serialiseren
In Databricks Runtime 12.2 LTS en hoger kunt from_protobuf
to_protobuf
u gegevens serialiseren en deserialiseren. Protobuf-serialisatie wordt vaak gebruikt in streamingworkloads.
De basissyntaxis voor protobuf-functies is vergelijkbaar voor lees- en schrijffuncties. U moet deze functies importeren voordat u deze gebruikt.
from_protobuf
cast een binaire kolom naar een struct en to_protobuf
cast een struct-kolom naar binair. U moet een schemaregister opgeven dat is opgegeven met het options
argument of een descriptorbestand dat is geïdentificeerd door het descFilePath
argument.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
De volgende voorbeelden illustreren het verwerken van binaire protobuf-records met from_protobuf()
en het converteren van Spark SQL-struct naar binaire protobuf met to_protobuf()
.
Protobuf gebruiken met Confluent Schema Registry
Azure Databricks biedt ondersteuning voor het gebruik van het Confluent-schemaregister om Protobuf te definiëren.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Verifiëren bij een extern Confluent-schemaregister
Als u wilt verifiëren bij een extern Confluent-schemaregister, moet u de opties voor het schemaregister bijwerken om verificatiereferenties en API-sleutels op te nemen.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
TrustStore- en keystore-bestanden gebruiken in Unity Catalog-volumes
In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalog-volumes gebruiken om te verifiëren bij een Confluent-schemaregister. Werk de schemaregisteropties bij volgens het volgende voorbeeld:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Protobuf gebruiken met een descriptorbestand
U kunt ook verwijzen naar een protobuf descriptorbestand dat beschikbaar is voor uw rekencluster. Zorg ervoor dat u over de juiste machtigingen beschikt om het bestand te lezen, afhankelijk van de locatie.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Ondersteunde opties in Protobuf-functies
De volgende opties worden ondersteund in Protobuf-functies.
- modus: bepaalt hoe fouten tijdens het deseriialiseren van Protobuf-records worden verwerkt. De fouten kunnen worden veroorzaakt door verschillende typen onjuiste records, waaronder een onjuiste overeenkomst tussen het werkelijke schema van de record en het verwachte schema in
from_protobuf()
.- Waarden:
FAILFAST
(standaard): er wordt een fout gegenereerd wanneer er een onjuiste record wordt aangetroffen en de taak mislukt.PERMISSIVE
: Er wordt een NULL geretourneerd voor ongeldige records. Gebruik deze optie zorgvuldig, omdat dit kan leiden tot het verwijderen van veel records. Dit is handig wanneer een klein deel van de records in de bron onjuist is.
- Waarden:
- recursive.fields.max.depth: voegt ondersteuning toe voor recursieve velden. Spark SQL-schema's bieden geen ondersteuning voor recursieve velden. Wanneer deze optie niet is opgegeven, zijn recursieve velden niet toegestaan. Om recursieve velden in Protobufs te ondersteunen, moeten ze worden uitgebreid naar een opgegeven diepte.
Waarden:
-1 (standaard): Recursieve velden zijn niet toegestaan.
0: Recursieve velden worden verwijderd.
1: Hiermee staat u één niveau van recursie toe.
[2-10]: Geef een drempelwaarde op voor meerdere recursieniveaus, maximaal 10 niveaus.
Als u een waarde instelt op meer dan 0, kunnen recursieve velden worden gebruikt door de geneste velden uit te breiden naar de geconfigureerde diepte. Waarden die groter zijn dan 10 zijn niet toegestaan om onbedoeld zeer grote schema's te maken. Als een Protobuf-bericht diepte heeft buiten de geconfigureerde limiet, wordt de Geretourneerde Spark-struct afgekapt na de recursielimiet.
Voorbeeld: Bekijk een Protobuf met het volgende recursieve veld:
message Person { string name = 1; Person friend = 2; }
Hieronder ziet u het eindschema met verschillende waarden voor deze instelling:
- Optie ingesteld op 1:
STRUCT<name: STRING>
- Optie ingesteld op 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Optie ingesteld op 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Optie ingesteld op 1:
- convert.any.fields.to.json: Met deze optie kunt u Protobuf Any-velden converteren naar JSON. Deze functie moet zorgvuldig worden ingeschakeld. JSON-conversie en -verwerking zijn inefficiënt. Daarnaast verliest het JSON-tekenreeksveld de veiligheid van het Protobuf-schema, waardoor downstreamverwerking gevoelig is voor fouten.
Waarden:
- False (standaard): Tijdens runtime kunnen dergelijke jokertekenvelden willekeurige Protobuf-berichten bevatten als binaire gegevens. Dergelijke velden worden standaard verwerkt als een normaal Protobuf-bericht. Het heeft twee velden met schema
(STRUCT<type_url: STRING, value: BINARY>)
. Standaard wordt het binairevalue
veld op geen enkele manier geïnterpreteerd. Maar de binaire gegevens zijn mogelijk niet handig in de praktijk om in sommige toepassingen te werken. - Waar: Als u deze waarde instelt op True, kunnen velden tijdens runtime worden geconverteerd
Any
naar JSON-tekenreeksen. Met deze optie wordt het binaire bestand geparseerd en wordt het Protobuf-bericht gedeserialiseerd in een JSON-tekenreeks.
- False (standaard): Tijdens runtime kunnen dergelijke jokertekenvelden willekeurige Protobuf-berichten bevatten als binaire gegevens. Dergelijke velden worden standaard verwerkt als een normaal Protobuf-bericht. Het heeft twee velden met schema
Voorbeeld: Overweeg twee Protobuf-typen die als volgt zijn gedefinieerd:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Als deze optie is ingeschakeld, is het schema voor
from_protobuf("col", messageName ="ProtoWithAny")
:STRUCT<event_name: STRING, details: STRING>
.Als het veld Protobuf-bericht bevat
Person
,details
ziet de geretourneerde waarde er als volgt uit:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Vereisten:
- De definities voor alle mogelijke Protobuf-typen die in velden worden gebruikt
Any
, moeten beschikbaar zijn in het Protobuf-descriptorbestand dat wordt doorgegeven aanfrom_protobuf()
. - Als
Any
Protobuf niet wordt gevonden, resulteert dit in een fout voor die record. - Deze functie wordt momenteel niet ondersteund met schemaregister.
- De definities voor alle mogelijke Protobuf-typen die in velden worden gebruikt
- emit.default.values: hiermee schakelt u renderingvelden met nulwaarden in bij het deserialiseren van Protobuf naar een Spark-struct. Deze optie moet spaarzaam worden gebruikt. Het is meestal niet raadzaam om afhankelijk te zijn van dergelijke fijnere verschillen in semantiek.
Waarden
- False (standaard): Wanneer een veld leeg is in de geserialiseerde Protobuf, is het resulterende veld in de Spark-struct standaard null. Het is eenvoudiger om deze optie niet in te schakelen en te behandelen
null
als de standaardwaarde. - True: Wanneer deze optie is ingeschakeld, worden dergelijke velden gevuld met bijbehorende standaardwaarden.
- False (standaard): Wanneer een veld leeg is in de geserialiseerde Protobuf, is het resulterende veld in de Spark-struct standaard null. Het is eenvoudiger om deze optie niet in te schakelen en te behandelen
Voorbeeld: Bekijk de volgende Protobuf met de Protobuf die is samengesteld als
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Als deze optie is ingesteld op Onwaar, is de Spark-struct na het aanroepen
from_protobuf()
alle null-waarden:{"name": null, "age": null, "middle_name": "", "salary": null}
. Hoewel in twee velden (age
enmiddle_name
) waarden zijn ingesteld, bevat Protobuf ze niet in wire-format omdat het standaardwaarden zijn. - Als deze optie is ingesteld op True, is de Spark-struct na het aanroepen
from_protobuf()
:{"name": "", "age": 0, "middle_name": "", "salary": null}
. Hetsalary
veld blijft null omdat het expliciet wordt gedeclareerdoptional
en niet is ingesteld in de invoerrecord.
- Als deze optie is ingesteld op Onwaar, is de Spark-struct na het aanroepen
- enums.as.ints: als deze optie is ingeschakeld, worden opsommingsvelden in Protobuf weergegeven als gehele getallen in Spark.
Waarden
- False (standaard)
- Waar: wanneer deze optie is ingeschakeld, worden opsommingsvelden in Protobuf weergegeven als gehele getallen in Spark.
Voorbeeld: Bekijk de volgende Protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Gegeven een Protobuf bericht zoals
Person(job = ENGINEER)
:- Als deze optie is uitgeschakeld, zou de bijbehorende Spark-struct zijn
{"job": "ENGINEER"}
. - Als deze optie is ingeschakeld, zou de bijbehorende Spark-struct zijn
{"job": 1}
.
U ziet dat het schema voor deze velden in elk geval anders is (geheel getal in plaats van standaardtekenreeks). Een dergelijke wijziging kan van invloed zijn op het schema van downstreamtabellen.
- Als deze optie is uitgeschakeld, zou de bijbehorende Spark-struct zijn
Schemaregisteropties
De volgende schemaregisteropties zijn relevant bij het gebruik van het schemaregister met Protobuf-functies.
- schema.registry.subject
- Vereist
- Hiermee geeft u het onderwerp voor schema in schemaregister, zoals 'client-event'
- schema.registry.address
- Vereist
- URL voor schemaregister, zoals
https://schema-registry.example.com:8081
- schema.registry.protobuf.name
- Optioneel
- Standaard:
<NONE>
. - Een schemaregistervermelding voor een onderwerp kan meerdere Protobuf-definities bevatten, net als één
proto
bestand. Wanneer deze optie niet is opgegeven, wordt de eerste Protobuf gebruikt voor het schema. Geef de naam van het Protobuf-bericht op wanneer dit niet de eerste is in de vermelding. Denk bijvoorbeeld aan een vermelding met twee Protobuf-definities: 'Persoon' en 'Locatie' in die volgorde. Als de stream overeenkomt met 'Locatie' in plaats van 'Persoon', stelt u deze optie in op 'Locatie' (of de volledige naam, inclusief pakket 'com.example.protos.Location').
- schema.registry.schema.evolution.mode
- Standaard: 'opnieuw opstarten'.
- Ondersteunde modi:
- "opnieuw opstarten"
- "geen"
- Met deze optie stelt u de schemaontwikkelingsmodus in voor
from_protobuf()
. Aan het begin van een query registreert Spark de meest recente schema-id voor het opgegeven onderwerp. Hiermee bepaalt u het schema voorfrom_protobuf()
. Er kan een nieuw schema worden gepubliceerd in het schemaregister nadat de query is gestart. Wanneer een nieuwere schema-id wordt opgemerkt in een binnenkomende record, wordt een wijziging in het schema aangegeven. Met deze optie wordt bepaald hoe een dergelijke wijziging in het schema wordt verwerkt:- opnieuw opstarten (standaard): hiermee wordt een
UnknownFieldException
nieuwere schema-id geactiveerd. Hiermee wordt de query beëindigd. Databricks raadt u aan taken te configureren om opnieuw op te starten bij een queryfout om schemawijzigingen op te halen. - geen: wijzigingen in schema-id's worden genegeerd. De records met een nieuwere schema-id worden geparseerd met hetzelfde schema dat aan het begin van de query is waargenomen. Nieuwere Protobuf-definities zijn naar verwachting compatibel met eerdere versies en nieuwe velden worden genegeerd.
- opnieuw opstarten (standaard): hiermee wordt een
- confluent.schema.registry.
<schema-registy-client-option>
- Optioneel
- Schema-register maakt verbinding met confluent schema-register met behulp van de Confluent Schema Registry-client. Alle configuratieopties die door de client worden ondersteund, kunnen worden opgegeven met het voorvoegsel "confluent.schema.registry". De volgende twee instellingen bieden bijvoorbeeld verificatiereferenties voor USER_INFO:
- "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO'
- "confluent.schema.registry.basic.auth.user.info": " :
<SECRET>
"<KEY>