Ler e gravar buffers de protocolo

O Azure Databricks fornece suporte nativo para serialização e desserialização entre structs do Apache Spark e buffers de protocolo (protobuf). O suporte ao protobuf é implementado como um transformador de DataFrame do Apache Spark e pode ser usado com Streaming Estruturado ou para operações em lote.

Como desserializar e serializar buffers de protocolo

No Databricks Runtime 12.2 LTS e posteriores, você pode usar funções from_protobuf e to_protobuf para serializar e desserializar dados. A serialização protobuf é comumente usada em cargas de trabalho de streaming.

A sintaxe básica para funções protobuf é semelhante para funções de leitura e gravação. Você deve importar essas funções antes de usar.

O from_protobuf converte uma coluna binária em um struct e o to_protobuf converte uma coluna struct em binário. Você deve fornecer um registro de esquema especificado com o argumento options ou um arquivo descritor identificado pelo argumento descFilePath.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Os exemplos a seguir ilustram o processamento de registros protobuf binários com from_protobuf() e a conversão do struct do Spark SQL em protobuf binário com to_protobuf().

Usar protobuf com o Registro de Esquema Confluent

O Azure Databricks dá suporte ao uso do Registro de Esquema Confluent para definir o Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Autenticar em um Registro de Esquema Confluent externo

Para se autenticar em um Registro de Esquema Confluent externo, atualize as opções do registro de esquema para incluir credenciais de autenticação e chaves de API.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Usar arquivos truststore e de repositório de chaves em volumes do Catálogo do Unity

No Databricks Runtime 14.3 LTS e posteriores, você pode usar arquivos truststore e de repositório de chaves em volumes do Catálogo do Unity para autenticar em um registro de esquema Confluent. Atualize as opções do registro de esquema de acordo com o seguinte exemplo:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Usar o protobuf com um arquivo descritor

Você também pode referenciar um arquivo descritor protobuf que está disponível para o cluster de cálculo. Verifique se você tem permissões adequadas para ler o arquivo, dependendo de sua localização.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Opções com suporte em funções Protobuf

As opções a seguir têm suporte em funções Protobuf.

  • modo: determina como são tradados os erros durante a desserialização dos registros Protobuf. Os erros podem ser causados por vários tipos de registros malformados, incluindo uma incompatibilidade entre o esquema real do registro e o esquema esperado fornecido em from_protobuf().
    • Valores:
      • FAILFAST(padrão): um erro é gerado quando um registro malformado é encontrado, e a tarefa falha.
      • PERMISSIVE: um NULL é retornado para registros malformados. Use essa opção com cuidado, pois ela pode resultar na remoção de muitos registros. Isso é útil quando uma pequena fração dos registros na origem está incorreta.
  • recursive.fields.max.depth: adiciona suporte para campos recursivos. Os esquemas SQL do Spark não dão suporte a campos recursivos. Quando essa opção não é especificada, campos recursivos não são permitidos. Para dar suporte a campos recursivos em Protobufs, eles precisam estar expandindo para uma profundidade especificada.
    • Valores:

      • -1 (padrão): campos recursivos não são permitidos.

      • 0: campos recursivos são descartados.

      • 1: permite um único nível de recursão.

      • [2-10]: especifique um limite para várias recursões, até 10 níveis.

        Definir um valor como maior que 0 permite campos recursivos expandindo os campos aninhados para a profundidade configurada. Valores maiores que 10 não são permitidos para evitar a criação inadvertida de esquemas muito grandes. Se uma mensagem Protobuf tiver profundidade além do limite configurado, o struct do Spark retornado será truncado após o limite de recursão.

    • Exemplo: considere um Protobuf com o seguinte campo recursivo:

      message Person { string name = 1; Person friend = 2; }
      

      O seguinte lista o esquema final com valores diferentes para esta configuração:

      • Opção definida como 1: STRUCT<name: STRING>
      • Opção definida como 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Opção definida como 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: essa opção permite a conversão os campos Any do Protobuf em JSON. Esse recurso deve ser habilitado com cuidado. Conversão e processamento JSON são ineficientes. Além disso, o campo de cadeia de caracteres JSON perde a segurança do esquema Protobuf, tornando o processamento downstream propenso a erros.
    • Valores:

      • False (padrão): em runtime, esses campos curinga podem conter mensagens Protobuf arbitrárias como dados binários. Por padrão, esses campos são tratados como uma mensagem Protobuf normal. Ele tem dois campos com esquema (STRUCT<type_url: STRING, value: BINARY>). Por padrão, o campo value binário não é interpretado de forma alguma. Mas os dados binários podem não ser convenientes na prática para funcionar em alguns aplicativos.
      • True: definir esse valor como True permite converter campos Any em cadeias de caracteres JSON em runtime. Com essa opção, o binário é analisado e a mensagem Protobuf é desserializada em uma cadeia de caracteres JSON.
    • Exemplo: considere dois tipos Protobuf definidos da seguinte maneira:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Com essa opção habilitada, o esquema para from_protobuf("col", messageName ="ProtoWithAny") seria: STRUCT<event_name: STRING, details: STRING>.

      Em tempo de execução, se o campo details contiver a mensagem Protobuf Person, o valor retornado será semelhante a este: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Requisitos:

      • As definições para todos os tipos de Protobuf possíveis usados em campos Any devem estar disponíveis no arquivo de descritor Protobuf passado para from_protobuf().
      • Se Protobuf Any não for encontrado, isso resultará em um erro para esse registro.
      • No momento, não há suporte para esse recurso com schema-registry.
  • emit.default.values: habilita a renderização de campos com valores zero ao desserializar o Protobuf para um struct do Spark. Essa opção deve ser usada com moderação. Geralmente, não é aconselhável depender de diferenças tão pequenas na semântica.
    • Valores

      • False (padrão): quando um campo está vazio no Protobuf serializado, o campo resultante no struct do Spark é nulo por padrão. É mais simples não habilitar essa opção e tratar null como o valor padrão.
      • True: quando essa opção está habilitada, esses campos são preenchidos com valores padrão correspondentes.
    • Exemplo: considere o Protobuf a seguir com o Protobuf construído como Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Com essa opção definida como False, o struct do Spark depois de chamar from_protobuf() seria nulo: {"name": null, "age": null, "middle_name": "", "salary": null}. Embora dois campos (age e middle_name) tenham valores definidos, o Protobuf não os inclui em formato de fio, pois são valores padrão.
      • Com essa opção definida como False, o struct do Spark depois de chamar from_protobuf() seria: {"name": "", "age": 0, "middle_name": "", "salary": null}. O campo salary permanece nulo, pois é declarado explicitamente optional e não está definido no registro de entrada.
  • enums.as.ints: quando habilitados, os campos de enumeração no Protobuf são renderizados como campos inteiros no Spark.
    • Valores

      • False (padrão)
      • True: quando habilitados, os campos de enumeração no Protobuf são renderizados como campos inteiros no Spark.
    • Exemplo: considere o seguinte Protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Em uma determinada mensagem Protobuf como Person(job = ENGINEER):

      • Com essa opção desabilitada, o struct do Spark correspondente seria {"job": "ENGINEER"}.
      • Com essa opção desabilitada, o struct do Spark correspondente seria {"job": 1}.

      Observe que o esquema desses campos é diferente em cada caso (inteiro em vez de cadeia de caracteres padrão). Essa alteração pode afetar o esquema de tabelas downstream.

Opções do Registro de Esquema

As opções de registro de esquema a seguir são relevantes ao usar o registro de esquema com funções Protobuf.

  • schema.registry.subject
    • Obrigatório
    • Especifica o assunto do esquema no Registro de Esquema, como “client-event”
  • schema.registry.address
    • Obrigatório
    • URL para registro de esquema, como https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Opcional
    • Padrão: <NONE>.
    • Uma entrada de registro de esquema para um assunto pode conter várias definições do Protobuf, assim como um único arquivo proto. Quando essa opção não é especificada, o primeiro Protobuf é usado para o esquema. Especifique o nome da mensagem Protobuf quando ela não for a primeira na entrada. Por exemplo, considere uma entrada com duas definições do Protobuf: “Pessoa” e “Localização” nessa ordem. Se o fluxo corresponder a “Location” em vez “de Person”, defina essa opção como “Location” (ou seu nome completo, incluindo o pacote “com.example.protos.Location”).
  • schema.registry.schema.evolution.mode
    • Padrão: “restart”.
    • Modos com suporte:
      • “restart”
      • “none”
    • Essa opção define o modo de schema-evolution para from_protobuf(). No início de uma consulta, o Spark registra a ID de esquema mais recente para o assunto especificado. Isso determina o esquema para from_protobuf(). Um novo esquema pode ser publicado no registro de esquema após o início da consulta. Quando uma ID de esquema mais recente é notada em um registro de entrada, ela indica uma alteração no esquema. Essa opção determina como essa alteração no esquema é tratada:
      • restart (padrão): dispara um UnknownFieldException quando uma ID de esquema mais recente é notada. Isso encerra a consulta. O Databricks recomenda configurar fluxos de trabalho para reiniciar em caso de falha de consulta para selecionar alterações de esquema.
      • none: as alterações de schema-id são ignoradas. Os registros com schema-id mais recente são analisados com o mesmo esquema que foi observado no início da consulta. Espera-se que as definições mais recentes do Protobuf sejam compatíveis com versões anteriores e novos campos sejam ignorados.
  • confluent.schema.registry.<schema-registy-client-option>
    • Opcional
    • O squema-registry conecta-se ao schema-registry do Confluent usando o cliente do Registro de Esquema Confluent. Todas as opções de configuração compatíveis com o cliente podem ser especificadas com o prefixo “confluent.schema.registry”. Por exemplo, as duas configurações a seguir fornecem credenciais de autenticação “USER_INFO”:
      • “confluent.schema.registry.basic.auth.credentials.source”: ‘USER_INFO’
      • “confluent.schema.registry.basic.auth.user.info”: “<KEY> : <SECRET>