Vyrovnávací paměti protokolu pro čtení a zápis
Azure Databricks poskytuje nativní podporu serializace a deserializace mezi strukturami Apache Sparku a vyrovnávacími paměťmi protokolu (protobuf). Podpora Protobuf je implementována jako transformátor datového rámce Apache Spark a lze ji použít se strukturovaným streamováním nebo pro dávkové operace.
Jak deserializovat a serializovat vyrovnávací paměti protokolu
Ve verzi Databricks Runtime 12.2 LTS a vyšší můžete data serializovat a deserializovat pomocí from_protobuf
funkcí to_protobuf
. Serializace Protobuf se běžně používá v úlohách streamování.
Základní syntaxe funkcí protobuf je podobná pro funkce pro čtení a zápis. Před použitím je nutné tyto funkce importovat.
from_protobuf
přetypuje binární sloupec na strukturu a to_protobuf
přetypuje sloupec struktury na binární. Je nutné zadat registr schématu určený argumentem options
nebo soubor popisovače identifikovaný argumentem descFilePath
.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Následující příklady ilustrují zpracování binárních záznamů protobuf pomocí from_protobuf()
a převod struktury Spark SQL na binární protobuf s to_protobuf()
.
Použití protobuf s registrem schématu Confluent
Azure Databricks podporuje definování Protobuf pomocí registru schémat Confluent.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Ověření v externím registru schématu Confluent
Pokud se chcete ověřit v externím registru schématu Confluent, aktualizujte možnosti registru schématu tak, aby zahrnovaly přihlašovací údaje ověřování a klíče rozhraní API.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Použití úložiště důvěryhodnosti a souborů úložiště klíčů ve svazcích katalogu Unity
Ve službě Databricks Runtime 14.3 LTS a vyšší můžete k ověření v registru schémat Confluent použít úložiště důvěryhodnosti a soubory úložiště klíčů ve svazcích katalogu Unity. Aktualizujte možnosti registru schématu podle následujícího příkladu:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Použití Protobuf se souborem popisovače
Můžete také odkazovat na soubor popisovače protobuf, který je dostupný pro váš výpočetní cluster. Ujistěte se, že máte správná oprávnění ke čtení souboru v závislosti na jeho umístění.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Podporované možnosti ve funkcích Protobuf
Funkce Protobuf podporují následující možnosti.
- mode: Určuje, jak se zpracovávají chyby při deserializaci záznamů Protobuf. Chyby mohou být způsobeny různými typy poškozených záznamů, včetně neshody mezi skutečným schématem záznamu a očekávaným schématem zadaným v
from_protobuf()
.- Hodnoty:
FAILFAST
(výchozí): Při výskytu poškozeného záznamu dojde k chybě a úloha selže.PERMISSIVE
: Pro poškozené záznamy se vrátí hodnota NULL. Tuto možnost používejte pečlivě, protože může vést k zahození mnoha záznamů. To je užitečné v případě, že malá část záznamů ve zdroji není správná.
- Hodnoty:
- recursive.fields.max.depth: Přidá podporu rekurzivních polí. Schémata Spark SQL nepodporují rekurzivní pole. Pokud tato možnost není zadána, rekurzivní pole nejsou povolena. Aby bylo možné podporovat rekurzivní pole v Protobufs, je potřeba je rozšířit na zadanou hloubku.
Hodnoty:
-1 (výchozí): Rekurzivní pole nejsou povolená.
0: Rekurzivní pole se zahodí.
1: Umožňuje jednu úroveň rekurze.
[2–10]: Zadejte prahovou hodnotu pro více rekurzí až 10 úrovní.
Nastavení hodnoty větší než 0 umožňuje rekurzivní pole rozbalením vnořených polí na nakonfigurovanou hloubku. Hodnoty větší než 10 nejsou povoleny, aby se zabránilo neúmyslnému vytváření velmi velkých schémat. Pokud zpráva Protobuf přesahuje nakonfigurovaný limit, vrátí se vrácená struktura Sparku po limitu rekurze zkrácena.
Příklad: Zvažte Protobuf s následujícím rekurzivním polem:
message Person { string name = 1; Person friend = 2; }
Následující seznam obsahuje koncové schéma s různými hodnotami pro toto nastavení:
- Možnost nastavená na 1:
STRUCT<name: STRING>
- Možnost nastavená na 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Možnost nastavená na 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Možnost nastavená na 1:
- convert.any.fields.to.json: Tato možnost umožňuje převádět pole Protobuf Any na JSON. Tuto funkci byste měli povolit pečlivě. Převod a zpracování JSON je neefektivní. Kromě toho pole řetězce JSON ztratí zabezpečení schématu Protobuf, aby podřízené zpracování náchylné k chybám.
Hodnoty:
- False (výchozí): Za běhu můžou taková pole se zástupnými cardy obsahovat libovolné zprávy Protobuf jako binární data. Ve výchozím nastavení se tato pole zpracovávají jako normální zpráva Protobuf. Má dvě pole se schématem
(STRUCT<type_url: STRING, value: BINARY>)
. Ve výchozím nastavenívalue
binární pole není interpretováno žádným způsobem. Binární data ale nemusí být v praxi vhodná pro práci v některých aplikacích. - True: Nastavení této hodnoty na Hodnotu True umožňuje převádět
Any
pole na řetězce JSON za běhu. Při této možnosti se binární soubor analyzuje a zpráva Protobuf se deserializuje do řetězce JSON.
- False (výchozí): Za běhu můžou taková pole se zástupnými cardy obsahovat libovolné zprávy Protobuf jako binární data. Ve výchozím nastavení se tato pole zpracovávají jako normální zpráva Protobuf. Má dvě pole se schématem
Příklad: Zvažte dva typy Protobuf definované takto:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Pokud je tato možnost povolená, schéma
from_protobuf("col", messageName ="ProtoWithAny")
by bylo:STRUCT<event_name: STRING, details: STRING>
.Pokud pole v době
details
běhu obsahujePerson
zprávu Protobuf, vrácená hodnota vypadá takto:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Požadavky:
- Definice všech možných typů Protobuf, které se používají v
Any
polích, by měly být k dispozici v souboru popisovače Protobuf předané .from_protobuf()
- Pokud
Any
protobuf nebyl nalezen, dojde k chybě pro tento záznam. - Tato funkce se v současné době nepodporuje v registru schématu.
- Definice všech možných typů Protobuf, které se používají v
- emit.default.values: Povolí vykreslování polí s nulovými hodnotami při deserializaci Protobuf do struktury Sparku. Tato možnost by se měla používat střídmě. Obvykle se nedoporučuje záviset na takových jemných rozdílech v sémantice.
Hodnoty
- False (výchozí): Pokud je pole v serializovaném protobuf prázdné, výsledné pole ve struktuře Sparku má ve výchozím nastavení hodnotu null. Tuto možnost nelze povolit a považovat
null
ji za výchozí hodnotu. - True: Pokud je tato možnost povolená, jsou tato pole vyplněna odpovídajícími výchozími hodnotami.
- False (výchozí): Pokud je pole v serializovaném protobuf prázdné, výsledné pole ve struktuře Sparku má ve výchozím nastavení hodnotu null. Tuto možnost nelze povolit a považovat
Příklad: Zvažte následující Protobuf s Protobuf vytvořený jako
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Pokud je tato možnost nastavená na False, struktura Sparku po volání
from_protobuf()
by byla všechna null:{"name": null, "age": null, "middle_name": "", "salary": null}
. I když dvě pole (age
amiddle_name
) obsahovaly nastavené hodnoty, Protobuf je neobsahuje ve formátu drátu, protože jsou to výchozí hodnoty. - Pokud je tato možnost nastavená na Hodnotu True, struktura Sparku po volání
from_protobuf()
by byla:{"name": "", "age": 0, "middle_name": "", "salary": null}
. Polesalary
zůstane null, protože je explicitně deklarovánooptional
a není nastaveno ve vstupním záznamu.
- Pokud je tato možnost nastavená na False, struktura Sparku po volání
- enums.as.ints: Při povolení se pole výčtu v Protobuf vykreslují jako celočíselná pole ve Sparku.
Hodnoty
- False (výchozí)
- True: Pokud je tato možnost povolená, vykreslí se pole výčtu v Protobuf jako celočíselná pole ve Sparku.
Příklad: Zvažte následující protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Vzhledem k tomu, protobuf zpráva jako
Person(job = ENGINEER)
:- Pokud je tato možnost zakázaná, odpovídající struktura Sparku by byla
{"job": "ENGINEER"}
. - Pokud je tato možnost povolená, odpovídající struktura Sparku by byla
{"job": 1}
.
Všimněte si, že schéma těchto polí se v každém případě liší (celé číslo, nikoli výchozí řetězec). Taková změna může ovlivnit schéma podřízených tabulek.
- Pokud je tato možnost zakázaná, odpovídající struktura Sparku by byla
Možnosti registru schématu
Následující možnosti registru schématu jsou relevantní při použití registru schématu s funkcemi Protobuf.
- schema.registry.subject
- Požaduje se
- Určuje předmět schématu v registru schématu, například "client-event".
- schema.registry.address
- Požaduje se
- Adresa URL registru schématu, například
https://schema-registry.example.com:8081
- schema.registry.protobuf.name
- Volitelné
- Výchozí hodnota:
<NONE>
. - Položka registru schématu pro předmět může obsahovat více definic Protobuf, stejně jako jeden
proto
soubor. Pokud tato možnost není zadána, použije se pro schéma první protobuf. Zadejte název zprávy Protobuf, pokud není první v položce. Představte si například položku se dvěma definicemi Protobuf: Person (Osoba) a Location (Umístění). Pokud datový proud odpovídá "Umístění" místo "Person", nastavte tuto možnost na "Umístění" (nebo jeho úplný název včetně balíčku "com.example.protos.Location").
- schema.registry.schema.evolution.mode
- Výchozí hodnota: "restart".
- Podporované režimy:
- "restart"
- "none"
- Tato možnost nastaví režim vývoje schématu pro
from_protobuf()
. Na začátku dotazu Spark zaznamená nejnovější ID schématu pro daný předmět. Určuje schéma profrom_protobuf()
. Po spuštění dotazu může být do registru schématu publikováno nové schéma. Když se v příchozím záznamu zaznamená novější id schématu, znamená to změnu schématu. Tato možnost určuje, jak se taková změna schématu zpracovává:- restart (výchozí): Aktivuje
UnknownFieldException
, když si všimnete novějšího ID schématu. Tím se dotaz ukončí. Databricks doporučuje konfigurovat pracovní postupy, které se mají restartovat při selhání dotazu, aby se změny schématu vybraly. - none: Změny id schématu se ignorují. Záznamy s novějším ID schématu se analyzují se stejným schématem, které bylo zjištěno na začátku dotazu. Očekává se, že novější definice Protobuf budou zpětně kompatibilní a nová pole budou ignorována.
- restart (výchozí): Aktivuje
- confluent.schema.registry.
<schema-registy-client-option>
- Volitelné
- Registr schématu se připojuje ke schématu Confluent schema-registry pomocí klienta Confluent Schema Registry. Všechny možnosti konfigurace podporované klientem lze zadat s předponou "confluent.schema.registry". Například následující dvě nastavení poskytují přihlašovací údaje pro ověřování USER_INFO:
- "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>
:<SECRET>
"