Sdílet prostřednictvím


Vyrovnávací paměti protokolu pro čtení a zápis

Azure Databricks poskytuje nativní podporu serializace a deserializace mezi strukturami Apache Sparku a vyrovnávacími paměťmi protokolu (protobuf). Podpora Protobuf je implementována jako transformátor datového rámce Apache Spark a lze ji použít se strukturovaným streamováním nebo pro dávkové operace.

Jak deserializovat a serializovat vyrovnávací paměti protokolu

Ve verzi Databricks Runtime 12.2 LTS a vyšší můžete data serializovat a deserializovat pomocí from_protobuf funkcí to_protobuf . Serializace Protobuf se běžně používá v úlohách streamování.

Základní syntaxe funkcí protobuf je podobná pro funkce pro čtení a zápis. Před použitím je nutné tyto funkce importovat.

from_protobuf přetypuje binární sloupec na strukturu a to_protobuf přetypuje sloupec struktury na binární. Je nutné zadat registr schématu určený argumentem options nebo soubor popisovače identifikovaný argumentem descFilePath .

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Následující příklady ilustrují zpracování binárních záznamů protobuf pomocí from_protobuf() a převod struktury Spark SQL na binární protobuf s to_protobuf().

Použití protobuf s registrem schématu Confluent

Azure Databricks podporuje definování Protobuf pomocí registru schémat Confluent.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Ověření v externím registru schématu Confluent

Pokud se chcete ověřit v externím registru schématu Confluent, aktualizujte možnosti registru schématu tak, aby zahrnovaly přihlašovací údaje ověřování a klíče rozhraní API.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Použití úložiště důvěryhodnosti a souborů úložiště klíčů ve svazcích katalogu Unity

Ve službě Databricks Runtime 14.3 LTS a vyšší můžete k ověření v registru schémat Confluent použít úložiště důvěryhodnosti a soubory úložiště klíčů ve svazcích katalogu Unity. Aktualizujte možnosti registru schématu podle následujícího příkladu:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Použití Protobuf se souborem popisovače

Můžete také odkazovat na soubor popisovače protobuf, který je dostupný pro váš výpočetní cluster. Ujistěte se, že máte správná oprávnění ke čtení souboru v závislosti na jeho umístění.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Podporované možnosti ve funkcích Protobuf

Funkce Protobuf podporují následující možnosti.

  • mode: Určuje, jak se zpracovávají chyby při deserializaci záznamů Protobuf. Chyby mohou být způsobeny různými typy poškozených záznamů, včetně neshody mezi skutečným schématem záznamu a očekávaným schématem zadaným v from_protobuf().
    • Hodnoty:
      • FAILFAST(výchozí): Při výskytu poškozeného záznamu dojde k chybě a úloha selže.
      • PERMISSIVE: Pro poškozené záznamy se vrátí hodnota NULL. Tuto možnost používejte pečlivě, protože může vést k zahození mnoha záznamů. To je užitečné v případě, že malá část záznamů ve zdroji není správná.
  • recursive.fields.max.depth: Přidá podporu rekurzivních polí. Schémata Spark SQL nepodporují rekurzivní pole. Pokud tato možnost není zadána, rekurzivní pole nejsou povolena. Aby bylo možné podporovat rekurzivní pole v Protobufs, je potřeba je rozšířit na zadanou hloubku.
    • Hodnoty:

      • -1 (výchozí): Rekurzivní pole nejsou povolená.

      • 0: Rekurzivní pole se zahodí.

      • 1: Umožňuje jednu úroveň rekurze.

      • [2–10]: Zadejte prahovou hodnotu pro více rekurzí až 10 úrovní.

        Nastavení hodnoty větší než 0 umožňuje rekurzivní pole rozbalením vnořených polí na nakonfigurovanou hloubku. Hodnoty větší než 10 nejsou povoleny, aby se zabránilo neúmyslnému vytváření velmi velkých schémat. Pokud zpráva Protobuf přesahuje nakonfigurovaný limit, vrátí se vrácená struktura Sparku po limitu rekurze zkrácena.

    • Příklad: Zvažte Protobuf s následujícím rekurzivním polem:

      message Person { string name = 1; Person friend = 2; }
      

      Následující seznam obsahuje koncové schéma s různými hodnotami pro toto nastavení:

      • Možnost nastavená na 1: STRUCT<name: STRING>
      • Možnost nastavená na 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Možnost nastavená na 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Tato možnost umožňuje převádět pole Protobuf Any na JSON. Tuto funkci byste měli povolit pečlivě. Převod a zpracování JSON je neefektivní. Kromě toho pole řetězce JSON ztratí zabezpečení schématu Protobuf, aby podřízené zpracování náchylné k chybám.
    • Hodnoty:

      • False (výchozí): Za běhu můžou taková pole se zástupnými cardy obsahovat libovolné zprávy Protobuf jako binární data. Ve výchozím nastavení se tato pole zpracovávají jako normální zpráva Protobuf. Má dvě pole se schématem (STRUCT<type_url: STRING, value: BINARY>). Ve výchozím nastavení value binární pole není interpretováno žádným způsobem. Binární data ale nemusí být v praxi vhodná pro práci v některých aplikacích.
      • True: Nastavení této hodnoty na Hodnotu True umožňuje převádět Any pole na řetězce JSON za běhu. Při této možnosti se binární soubor analyzuje a zpráva Protobuf se deserializuje do řetězce JSON.
    • Příklad: Zvažte dva typy Protobuf definované takto:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Pokud je tato možnost povolená, schéma from_protobuf("col", messageName ="ProtoWithAny") by bylo: STRUCT<event_name: STRING, details: STRING>.

      Pokud pole v době details běhu obsahuje Person zprávu Protobuf, vrácená hodnota vypadá takto: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Požadavky:

      • Definice všech možných typů Protobuf, které se používají v Any polích, by měly být k dispozici v souboru popisovače Protobuf předané .from_protobuf()
      • Pokud Any protobuf nebyl nalezen, dojde k chybě pro tento záznam.
      • Tato funkce se v současné době nepodporuje v registru schématu.
  • emit.default.values: Povolí vykreslování polí s nulovými hodnotami při deserializaci Protobuf do struktury Sparku. Tato možnost by se měla používat střídmě. Obvykle se nedoporučuje záviset na takových jemných rozdílech v sémantice.
    • Hodnoty

      • False (výchozí): Pokud je pole v serializovaném protobuf prázdné, výsledné pole ve struktuře Sparku má ve výchozím nastavení hodnotu null. Tuto možnost nelze povolit a považovat null ji za výchozí hodnotu.
      • True: Pokud je tato možnost povolená, jsou tato pole vyplněna odpovídajícími výchozími hodnotami.
    • Příklad: Zvažte následující Protobuf s Protobuf vytvořený jako Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Pokud je tato možnost nastavená na False, struktura Sparku po volání from_protobuf() by byla všechna null: {"name": null, "age": null, "middle_name": "", "salary": null}. I když dvě pole (age a middle_name) obsahovaly nastavené hodnoty, Protobuf je neobsahuje ve formátu drátu, protože jsou to výchozí hodnoty.
      • Pokud je tato možnost nastavená na Hodnotu True, struktura Sparku po volání from_protobuf() by byla: {"name": "", "age": 0, "middle_name": "", "salary": null}. Pole salary zůstane null, protože je explicitně deklarováno optional a není nastaveno ve vstupním záznamu.
  • enums.as.ints: Při povolení se pole výčtu v Protobuf vykreslují jako celočíselná pole ve Sparku.
    • Hodnoty

      • False (výchozí)
      • True: Pokud je tato možnost povolená, vykreslí se pole výčtu v Protobuf jako celočíselná pole ve Sparku.
    • Příklad: Zvažte následující protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Vzhledem k tomu, protobuf zpráva jako Person(job = ENGINEER):

      • Pokud je tato možnost zakázaná, odpovídající struktura Sparku by byla {"job": "ENGINEER"}.
      • Pokud je tato možnost povolená, odpovídající struktura Sparku by byla {"job": 1}.

      Všimněte si, že schéma těchto polí se v každém případě liší (celé číslo, nikoli výchozí řetězec). Taková změna může ovlivnit schéma podřízených tabulek.

Možnosti registru schématu

Následující možnosti registru schématu jsou relevantní při použití registru schématu s funkcemi Protobuf.

  • schema.registry.subject
    • Požaduje se
    • Určuje předmět schématu v registru schématu, například "client-event".
  • schema.registry.address
    • Požaduje se
    • Adresa URL registru schématu, například https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Volitelné
    • Výchozí hodnota: <NONE>.
    • Položka registru schématu pro předmět může obsahovat více definic Protobuf, stejně jako jeden proto soubor. Pokud tato možnost není zadána, použije se pro schéma první protobuf. Zadejte název zprávy Protobuf, pokud není první v položce. Představte si například položku se dvěma definicemi Protobuf: Person (Osoba) a Location (Umístění). Pokud datový proud odpovídá "Umístění" místo "Person", nastavte tuto možnost na "Umístění" (nebo jeho úplný název včetně balíčku "com.example.protos.Location").
  • schema.registry.schema.evolution.mode
    • Výchozí hodnota: "restart".
    • Podporované režimy:
      • "restart"
      • "none"
    • Tato možnost nastaví režim vývoje schématu pro from_protobuf(). Na začátku dotazu Spark zaznamená nejnovější ID schématu pro daný předmět. Určuje schéma pro from_protobuf(). Po spuštění dotazu může být do registru schématu publikováno nové schéma. Když se v příchozím záznamu zaznamená novější id schématu, znamená to změnu schématu. Tato možnost určuje, jak se taková změna schématu zpracovává:
      • restart (výchozí): Aktivuje UnknownFieldException , když si všimnete novějšího ID schématu. Tím se dotaz ukončí. Databricks doporučuje konfigurovat pracovní postupy, které se mají restartovat při selhání dotazu, aby se změny schématu vybraly.
      • none: Změny id schématu se ignorují. Záznamy s novějším ID schématu se analyzují se stejným schématem, které bylo zjištěno na začátku dotazu. Očekává se, že novější definice Protobuf budou zpětně kompatibilní a nová pole budou ignorována.
  • confluent.schema.registry.<schema-registy-client-option>
    • Volitelné
    • Registr schématu se připojuje ke schématu Confluent schema-registry pomocí klienta Confluent Schema Registry. Všechny možnosti konfigurace podporované klientem lze zadat s předponou "confluent.schema.registry". Například následující dvě nastavení poskytují přihlašovací údaje pro ověřování USER_INFO:
      • "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"