Olvasási és írási protokollpufferek
Az Azure Databricks natív támogatást nyújt az Apache Spark-szerkezetek és a protokollpufferek (protobuf) közötti szerializáláshoz és deszerializáláshoz. A Protobuf-támogatás Apache Spark DataFrame-transzformátorként van implementálva, és strukturált streameléssel vagy kötegelt műveletekhez használható.
Protokollpufferek deszerializálása és szerializálása
A Databricks Runtime 12.2 LTS és újabb verziókban az adatok szerializálására és deszerializálására és deszerializálására használható from_protobuf
és to_protobuf
használható függvények. A Protobuf szerializálást gyakran használják streamelési számítási feladatokban.
A protobuf függvények alapszintaxisa hasonló az olvasási és írási függvényekhez. Használat előtt importálnia kell ezeket a függvényeket.
from_protobuf
bináris oszlopot ad át egy szerkezetbe, és to_protobuf
egy struktúraoszlopot binárisra vet. Meg kell adnia egy sémaregisztrációs adatbázist, amely az options
argumentummal van megadva, vagy egy leírófájlt, amelyet az descFilePath
argumentum azonosít.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Az alábbi példák a bináris protobuf rekordok from_protobuf()
feldolgozását szemléltetik a Spark SQL-struktúra bináris protobuftá alakításával to_protobuf()
.
A protobuf használata a Confluent-sémaregisztrációs adatbázissal
Az Azure Databricks támogatja a Confluent sémaregisztrációs adatbázisának használatát a Protobuf definiálásához.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Hitelesítés külső Confluent-sémaregisztrációs adatbázisba
Ha külső Confluent-sémaregisztrációs adatbázisba szeretne hitelesítést végezni, frissítse a sémaregisztrációs beállításokat úgy, hogy azok tartalmazzák a hitelesítési hitelesítő adatokat és az API-kulcsokat.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Truststore- és keystore-fájlok használata Unity Catalog-kötetekben
A Databricks Runtime 14.3 LTS-ben és újabb verziókban a Unity Catalog-kötetekben található truststore- és keystore-fájlokat használhatja a Confluent-sémaregisztrációs adatbázisba való hitelesítéshez. Frissítse a sémaregisztrációs adatbázis beállításait az alábbi példának megfelelően:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
A Protobuf használata leíró fájllal
Hivatkozhat egy protobuf leírófájlra is, amely elérhető a számítási fürt számára. Győződjön meg arról, hogy megfelelő engedélyekkel rendelkezik a fájl olvasásához a helyétől függően.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
A Protobuf függvények támogatott beállításai
A Protobuf függvények az alábbi lehetőségeket támogatják.
- mód: Meghatározza a Protobuf-rekordok deszerializálása során felmerülő hibák kezelését. A hibákat különböző típusú helytelen formátumú rekordok okozhatják, beleértve a rekord tényleges sémája és a várt séma
from_protobuf()
közötti eltérést.- Értékek:
FAILFAST
(alapértelmezett): Hibásan formázott rekord esetén a rendszer hibát jelez, és a feladat meghiúsul.PERMISSIVE
: A rendszer null értéket ad vissza a hibásan formázott rekordokhoz. Ezt a lehetőséget körültekintően használja, mivel sok rekord elvetése is lehet. Ez akkor hasznos, ha a forrás rekordjainak egy kis része helytelen.
- Értékek:
- recursive.fields.max.depth: Támogatja a rekurzív mezőket. A Spark SQL-sémák nem támogatják a rekurzív mezőket. Ha ez a beállítás nincs megadva, a rekurzív mezők nem engedélyezettek. A Protobufs rekurzív mezőinek támogatásához a mezőket egy megadott mélységre kell bővíteni.
Értékek:
-1 (alapértelmezett): A rekurzív mezők nem engedélyezettek.
0: A rekurzív mezők elvetve lesznek.
1: Lehetővé teszi a rekurzió egyetlen szintjét.
[2–10]: Adjon meg egy küszöbértéket több rekurzióhoz, legfeljebb 10 szintig.
Ha egy értéket 0-nál nagyobb értékre állít be, a rekurzív mezőket a beágyazott mezők konfigurált mélységre való kibontásával teszi lehetővé. A 10-nél nagyobb értékek nem engedélyezettek a nagyon nagy sémák véletlen létrehozása érdekében. Ha egy Protobuf-üzenet mélysége meghaladja a konfigurált korlátot, a visszaadott Spark-szerkezet csonkolva lesz a rekurziós korlát után.
Példa: Fontolja meg a Következő rekurzív mezővel rendelkező Protobuf-t:
message Person { string name = 1; Person friend = 2; }
Az alábbi lista a beállításhoz tartozó különböző értékekkel rendelkező záró sémát sorolja fel:
- A beállítás értéke 1:
STRUCT<name: STRING>
- A beállítás értéke 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- A beállítás értéke 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- A beállítás értéke 1:
- convert.any.fields.to.json: Ez a beállítás lehetővé teszi a Protobuf Any mezők JSON-ra konvertálását. Ezt a funkciót gondosan engedélyezni kell. A JSON-átalakítás és -feldolgozás nem hatékony. Emellett a JSON-sztring mező elveszíti a Protobuf sémabiztonságot, így a lefelé irányuló feldolgozás hajlamos a hibákra.
Értékek:
- Hamis (alapértelmezett): Futásidőben az ilyen helyettesítő karakterek tetszőleges Protobuf-üzeneteket tartalmazhatnak bináris adatként. Az ilyen mezők alapértelmezés szerint normál Protobuf-üzenetként vannak kezelve. Két, sémával
(STRUCT<type_url: STRING, value: BINARY>)
rendelkező mezővel rendelkezik. Alapértelmezés szerint a binárisvalue
mező semmilyen módon nem értelmezhető. Előfordulhat azonban, hogy a bináris adatok a gyakorlatban nem megfelelőek egyes alkalmazásokban való munkavégzéshez. - Igaz: Az érték Igaz értékre állítása lehetővé teszi a mezők JSON-sztringekké való konvertálását
Any
futásidőben. Ezzel a beállítással a bináris elem elemzése és a Protobuf-üzenet deszerializálása JSON-sztringgé történik.
- Hamis (alapértelmezett): Futásidőben az ilyen helyettesítő karakterek tetszőleges Protobuf-üzeneteket tartalmazhatnak bináris adatként. Az ilyen mezők alapértelmezés szerint normál Protobuf-üzenetként vannak kezelve. Két, sémával
Példa: Fontolja meg két Protobuf-típust a következőképpen:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Ha ez a beállítás engedélyezve van, a séma
from_protobuf("col", messageName ="ProtoWithAny")
a következő lesz:STRUCT<event_name: STRING, details: STRING>
.Futtatáskor, ha
details
a mező Protobuf-üzenetet tartalmazPerson
, a visszaadott érték a következőképpen néz ki:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Követelmények:
- A mezőkben
Any
használt összes lehetséges Protobuf-típus definícióinak elérhetőnek kell lenniük a protobuf leíró fájlban, amelyet a program átadfrom_protobuf()
. - Ha
Any
a Protobuf nem található, az hibát fog eredményezni a rekordban. - A sémaregisztrációs adatbázis jelenleg nem támogatja ezt a funkciót.
- A mezőkben
- emit.default.values: A Protobuf Spark-szerkezetbe való deszerializálásakor engedélyezi a nulla értékű renderelési mezők megjelenítését. Ezt a lehetőséget takarékosan kell használni. Általában nem ajánlott a szemantika ilyen finomabb különbségeitől függeni.
Értékek
- Hamis (alapértelmezett): Ha egy mező üres a szerializált Protobufban, a Spark-szerkezet eredményül kapott mezője alapértelmezés szerint null. Egyszerűbb, ha nem engedélyezi ezt a beállítást, és alapértelmezett értékként kezeli
null
. - Igaz: Ha ez a beállítás engedélyezve van, az ilyen mezőket a megfelelő alapértelmezett értékek töltik ki.
- Hamis (alapértelmezett): Ha egy mező üres a szerializált Protobufban, a Spark-szerkezet eredményül kapott mezője alapértelmezés szerint null. Egyszerűbb, ha nem engedélyezi ezt a beállítást, és alapértelmezett értékként kezeli
Példa: Fontolja meg a következő Protobuf-t a Protobuf felépítése alapján:
Person(age=0, middle_name="")
syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Ha ez a beállítás False (Hamis) értékre van állítva, a Hívás
from_protobuf()
utáni Spark-szerkezet minden null értékű lesz:{"name": null, "age": null, "middle_name": "", "salary": null}
. Annak ellenére, hogy két mező (age
ésmiddle_name
) rendelkezik értékekkel, a Protobuf nem tartalmazza őket vezetékes formátumban, mivel ezek alapértelmezett értékek. - Ha ez a beállítás Igaz értékre van állítva, a Hívás
from_protobuf()
utáni Spark-struktúra a következő lesz:{"name": "", "age": 0, "middle_name": "", "salary": null}
. Asalary
mező null értékű marad, mivel explicit módon deklarálvaoptional
van, és nincs beállítva a bemeneti rekordban.
- Ha ez a beállítás False (Hamis) értékre van állítva, a Hívás
- enums.as.ints: Ha engedélyezve van, a Protobuf enum mezői egész számként jelennek meg a Sparkban.
Értékek
- False (alapértelmezett)
- Igaz: Ha engedélyezve van, a Protobuf enum mezői egész számként jelennek meg a Sparkban.
Példa: Fontolja meg a következő Protobufot:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Protobuf-üzenet, például
Person(job = ENGINEER)
:- Ha ez a beállítás le van tiltva, a megfelelő Spark-struktúra az lenne
{"job": "ENGINEER"}
. - Ha ez a beállítás engedélyezve van, a megfelelő Spark-struktúra lesz
{"job": 1}
.
Figyelje meg, hogy ezeknek a mezőknek a sémája minden esetben eltérő (az alapértelmezett sztring helyett egész szám). Ez a változás hatással lehet az alsóbb rétegbeli táblák sémájára.
- Ha ez a beállítás le van tiltva, a megfelelő Spark-struktúra az lenne
Sémaregisztrációs beállításjegyzék beállításai
A sémaregisztrációs adatbázis Protobuf-függvényekkel való használata során az alábbi sémaregisztrációs beállítások relevánsak.
- schema.registry.subject
- Kötelező
- A séma tárgyát adja meg a sémaregisztrációs adatbázisban, például az "ügyféleseményt"
- schema.registry.address
- Kötelező
- A sémaregisztrációs adatbázis URL-címe, például
https://schema-registry.example.com:8081
- schema.registry.protobuf.name
- Választható
- Alapértelmezett:
<NONE>
. - Egy tárgy sémaregisztrációs bejegyzése több Protobuf-definíciót is tartalmazhat, csakúgy, mint egyetlen
proto
fájlt. Ha ez a beállítás nincs megadva, a rendszer az első Protobufot használja a sémához. Adja meg a Protobuf-üzenet nevét, ha nem ez az első a bejegyzésben. Vegyünk például egy bejegyzést két Protobuf-definícióval: "Személy" és "Hely" ebben a sorrendben. Ha a stream nem "Személy", hanem "Hely" értéknek felel meg, állítsa ezt a beállítást "Hely" értékre (vagy annak teljes nevére, beleértve a "com.example.protos.Location" csomagot is).
- schema.registry.schema.evolution.mode
- Alapértelmezett: "újraindítás".
- Támogatott módok:
- "újraindítás"
- "nincs"
- Ez a beállítás a sémafejlődési módot állítja be a következőhöz
from_protobuf()
: . A lekérdezés elején a Spark rögzíti az adott tárgy legújabb sémaazonosítóját. Ez határozza meg a sémát a következőhözfrom_protobuf()
: . Előfordulhat, hogy a lekérdezés elindítása után egy új séma lesz közzétéve a sémaregisztrációs adatbázisban. Ha egy újabb sémaazonosítót észlel egy bejövő rekordban, az a séma módosítását jelzi. Ez a beállítás határozza meg a séma ilyen módosításának kezelését:- újraindítás (alapértelmezett): Egy újabb sémaazonosító észlelésekor aktiválja
UnknownFieldException
az eseményt. Ezzel leállítja a lekérdezést. A Databricks azt javasolja, hogy konfigurálja a munkafolyamatokat, hogy a sémamódosítások felvételéhez lekérdezési hiba esetén újrainduljanak. - nincs: A sémaazonosító módosításait a rendszer figyelmen kívül hagyja. Az újabb sémaazonosítóval rendelkező rekordok a lekérdezés elején megfigyelt sémával vannak elemezve. Az újabb Protobuf-definíciók várhatóan visszamenőlegesen kompatibilisek lesznek, és az új mezők figyelmen kívül lesznek hagyva.
- újraindítás (alapértelmezett): Egy újabb sémaazonosító észlelésekor aktiválja
- confluent.schema.registry.
<schema-registy-client-option>
- Választható
- A sémaregisztrációs adatbázis a Confluent sémaregisztrációs ügyféllel csatlakozik a Confluent sémaregisztrációs adatbázisához. Az ügyfél által támogatott konfigurációs beállítások a "confluent.schema.registry" előtaggal adhatók meg. A következő két beállítás például az "U Standard kiadás R_INFO" hitelesítési hitelesítő adatokat adja meg:
- "confluent.schema.registry.basic.auth.credentials.source": 'U Standard kiadás R_INFO'
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>
:<SECRET>
"