Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Azure Databricks poskytuje nativní podporu serializace a deserializace mezi strukturami Apache Sparku a vyrovnávacími paměťmi protokolu (protobuf). Podpora Protobuf je implementována jako transformátor datového rámce Apache Spark a lze ji použít se strukturovaným streamováním nebo pro dávkové operace.
Jak deserializovat a serializovat vyrovnávací paměti protokolu
Ve verzi Databricks Runtime 12.2 LTS a vyšší můžete data serializovat a deserializovat pomocí from_protobuf funkcí to_protobuf . Serializace Protobuf se běžně používá v úlohách streamování.
Základní syntaxe funkcí protobuf je podobná pro funkce pro čtení a zápis. Před použitím je nutné tyto funkce importovat.
from_protobuf přetypuje binární sloupec na strukturu a to_protobuf přetypuje sloupec struktury na binární. Je nutné zadat registr schématu určený argumentem options nebo soubor popisovače identifikovaný argumentem descFilePath .
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Následující příklady ilustrují zpracování binárních záznamů protobuf pomocí from_protobuf() a převod struktury Spark SQL na binární protobuf s to_protobuf().
Použijte protobuf s registrem schématu Confluent
Azure Databricks podporuje definování Protobuf pomocí registru schémat Confluent .
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Ověření v externím registru schématu Confluent
Pokud se chcete ověřit v externím registru schématu Confluent, aktualizujte možnosti registru schématu tak, aby zahrnovaly přihlašovací údaje ověřování a klíče rozhraní API.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Použití úložiště důvěryhodnosti a souborů úložiště klíčů ve svazcích katalogu Unity
Ve službě Databricks Runtime 14.3 LTS a vyšší můžete k ověření v registru schémat Confluent použít úložiště důvěryhodnosti a soubory úložiště klíčů ve svazcích katalogu Unity. Aktualizujte možnosti registru schématu podle následujícího příkladu:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Použití Protobuf se souborem popisovače
Můžete také odkazovat na soubor popisovače protobuf, který je dostupný pro váš výpočetní cluster. Ujistěte se, že máte správná oprávnění ke čtení souboru v závislosti na jeho umístění.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Podporované možnosti ve funkcích Protobuf
Funkce Protobuf podporují následující možnosti.
mode: Určuje, jak se zpracovávají chyby při deserializaci záznamů Protobuf. Chyby mohou být způsobeny různými typy poškozených záznamů, včetně neshody mezi skutečným schématem záznamu a očekávaným schématem zadaným v
from_protobuf().-
Hodnoty:
-
FAILFAST(výchozí): Při výskytu poškozeného záznamu dojde k chybě a úloha selže. -
PERMISSIVE: Pro poškozené záznamy se vrátí hodnota NULL. Tuto možnost používejte pečlivě, protože může vést k zahození mnoha záznamů. To je užitečné v případě, že malá část záznamů ve zdroji není správná.
-
-
Hodnoty:
recursive.fields.max.depth: Přidá podporu rekurzivních polí. Schémata Spark SQL nepodporují rekurzivní pole. Pokud tato možnost není zadána, rekurzivní pole nejsou povolena. Aby bylo možné podporovat rekurzivní pole v Protobufs, je potřeba je rozšířit na zadanou hloubku.
Hodnoty:
-1 (výchozí): Rekurzivní pole nejsou povolená.
0: Rekurzivní pole se zahodí.
1: Umožňuje jednu úroveň rekurze.
[2–10]: Zadejte prahovou hodnotu pro více rekurzí až 10 úrovní.
Nastavení hodnoty větší než 0 umožňuje rekurzivní pole rozbalením vnořených polí na nakonfigurovanou hloubku. Hodnoty větší než 10 nejsou povoleny, aby se zabránilo neúmyslnému vytváření velmi velkých schémat. Pokud hloubka zprávy Protobuf přesahuje nakonfigurovaný limit, vrácená struktura Sparku je po dosažení limitu rekurze zkrácena.
Příklad: Zvažte Protobuf s následujícím rekurzivním polem:
message Person { string name = 1; Person friend = 2; }Následující seznam obsahuje koncové schéma s různými hodnotami pro toto nastavení:
- Možnost nastavená na 1:
STRUCT<name: STRING> - Možnost nastavená na 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>> - Možnost nastavená na 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Možnost nastavená na 1:
convert.any.fields.to.json: Tato možnost umožňuje převádět pole Protobuf Any na JSON. Tuto funkci byste měli povolit pečlivě. Převod a zpracování JSON je neefektivní. Kromě toho řetězcové pole JSON ztrácí bezpečnost schématu Protobuf, což způsobuje, že následné zpracování je náchylné k chybám.
Hodnoty:
- False (výchozí): Za běhu můžou taková pole se zástupnými cardy obsahovat libovolné zprávy Protobuf jako binární data. Ve výchozím nastavení se tato pole zpracovávají jako normální zpráva Protobuf. Má dvě pole se schématem
(STRUCT<type_url: STRING, value: BINARY>). Ve výchozím nastavenívaluebinární pole není interpretováno žádným způsobem. Binární data ale nemusí být v praxi vhodná pro práci v některých aplikacích. - True: Nastavení této hodnoty na Hodnotu True umožňuje převádět
Anypole na řetězce JSON za běhu. Při této možnosti se binární soubor analyzuje a zpráva Protobuf se deserializuje do řetězce JSON.
- False (výchozí): Za běhu můžou taková pole se zástupnými cardy obsahovat libovolné zprávy Protobuf jako binární data. Ve výchozím nastavení se tato pole zpracovávají jako normální zpráva Protobuf. Má dvě pole se schématem
Příklad: Zvažte dva typy Protobuf definované takto:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }Pokud je tato možnost povolená, schéma
from_protobuf("col", messageName ="ProtoWithAny")by bylo:STRUCT<event_name: STRING, details: STRING>.Pokud pole v době
detailsběhu obsahujePersonzprávu Protobuf, vrácená hodnota vypadá takto:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').Požadavky:
- Definice všech možných typů Protobuf, které se používají v
Anypolích, by měly být k dispozici v souboru popisovače Protobuf předané .from_protobuf() - Pokud
Anyprotobuf nebyl nalezen, dojde k chybě pro tento záznam. - Tato funkce se v současné době nepodporuje s registrem schémat.
- Definice všech možných typů Protobuf, které se používají v
emit.default.values: Povolí vykreslování polí s nulovými hodnotami při deserializaci Protobuf do struktury Sparku. Tato možnost by se měla používat střídmě. Obvykle se nedoporučuje záviset na takových jemných rozdílech v sémantice.
Hodnoty
- False (výchozí): Pokud je pole v serializovaném protobuf prázdné, výsledné pole ve struktuře Sparku má ve výchozím nastavení hodnotu null. Tuto možnost nelze povolit a považovat
nullji za výchozí hodnotu. - True: Pokud je tato možnost povolená, jsou tato pole vyplněna odpovídajícími výchozími hodnotami.
- False (výchozí): Pokud je pole v serializovaném protobuf prázdné, výsledné pole ve struktuře Sparku má ve výchozím nastavení hodnotu null. Tuto možnost nelze povolit a považovat
Příklad: Zvažte následující Protobuf s Protobuf vytvořený jako
Person(age=0, middle_name=""):syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }- Pokud je tato možnost nastavená na False, struktura Sparku po volání
from_protobuf()by byla všechna null:{"name": null, "age": null, "middle_name": "", "salary": null}. I když dvě pole (ageamiddle_name) obsahovaly nastavené hodnoty, Protobuf je neobsahuje ve formátu drátu, protože jsou to výchozí hodnoty. - Pokud je tato možnost nastavená na Hodnotu True, struktura Sparku po volání
from_protobuf()by byla:{"name": "", "age": 0, "middle_name": "", "salary": null}. Polesalaryzůstane null, protože je explicitně deklarovánooptionala není nastaveno ve vstupním záznamu.
- Pokud je tato možnost nastavená na False, struktura Sparku po volání
enums.as.ints: Při povolení se pole výčtu v Protobuf vykreslují jako celočíselná pole ve Sparku.
Hodnoty
- False (výchozí)
- True: Pokud je tato možnost povolená, vykreslí se pole výčtu v Protobuf jako celočíselná pole ve Sparku.
Příklad: Zvažte následující protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }Vzhledem k tomu, protobuf zpráva jako
Person(job = ENGINEER):- Pokud je tato možnost zakázaná, odpovídající struktura Sparku by byla
{"job": "ENGINEER"}. - Pokud je tato možnost povolená, odpovídající struktura Sparku by byla
{"job": 1}.
Všimněte si, že schéma těchto polí se v každém případě liší (celé číslo, nikoli výchozí řetězec). Taková změna může ovlivnit schéma podřízených tabulek.
- Pokud je tato možnost zakázaná, odpovídající struktura Sparku by byla
Možnosti Schema Registry
Následující možnosti registru schématu jsou relevantní při použití registru schématu s funkcemi Protobuf.
-
schema.rejstřík.předmět
- Požaduje se
- Určuje předmět schématu v registru schématu, například "client-event".
-
schema.registry.address
- Požaduje se
- Adresa URL registru schématu, například
https://schema-registry.example.com:8081
-
schema.registry.protobuf.name
- Volitelné
- Výchozí hodnota:
<NONE>. - Položka registru schématu pro předmět může obsahovat více definic Protobuf, stejně jako jeden
protosoubor. Pokud není tato možnost specifikována, pro schéma se použije první Protobuf. Zadejte název zprávy Protobuf, pokud není první v položce. Představte si například položku se dvěma definicemi Protobuf: Person (Osoba) a Location (Umístění). Jestliže datový proud odpovídá „Umístění“ místo „Person“, nastavte tuto volbu na „Umístění“ (nebo jeho úplný název včetně balíčku „com.example.protos.Location“).
-
schema.registry.schema.evolution.mode
- Výchozí hodnota: "restart".
- Podporované režimy:
- "restart"
- "none"
- Tato možnost nastaví režim vývoje schématu pro
from_protobuf(). Na začátku dotazu Spark zaznamená nejnovější ID schématu pro daný předmět. Určuje schéma profrom_protobuf(). Po spuštění dotazu může být do registru schématu publikováno nové schéma. Když se v příchozím záznamu zaznamená novější id schématu, znamená to změnu schématu. Tato možnost určuje, jak se taková změna schématu zpracovává:-
restart (výchozí): Aktivuje
UnknownFieldException, když si všimnete novějšího ID schématu. Tím se dotaz ukončí. Databricks doporučuje konfigurovat úlohy na restartování při selhání dotazu pro zachycení změn schématu. - none: Změny id schématu se ignorují. Záznamy s novějším ID schématu se analyzují se stejným schématem, které bylo zjištěno na začátku dotazu. Očekává se, že novější definice Protobuf budou zpětně kompatibilní a nová pole budou ignorována.
-
restart (výchozí): Aktivuje
-
confluent.schema.registry.
<schema-registy-client-option>- Volitelné
- Registr schémat se připojuje k registru schémat Confluent pomocí klienta Confluent Schema Registry. Všechny možnosti konfigurace podporované klientem lze zadat s předponou "confluent.schema.registry". Například následující dvě nastavení poskytují přihlašovací údaje pro ověřování USER_INFO:
- "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>:<SECRET>"