Поделиться через


Чтение и запись буферов протокола

Azure Databricks обеспечивает встроенную поддержку сериализации и десериализации между структурами Apache Spark и буферами протокола (protobuf). Поддержка Protobuf реализована как преобразователь кадра данных Apache Spark и может использоваться с структурированной потоковой передачей или для пакетных операций.

Десериализация и сериализация буферов протокола

В Databricks Runtime 12.2 LTS и более поздних версиях можно использовать from_protobuf и to_protobuf функции для сериализации и десериализации данных. Сериализация Protobuf обычно используется в рабочих нагрузках потоковой передачи.

Базовый синтаксис функций protobuf аналогичен функциям чтения и записи. Прежде чем использовать эти функции, необходимо импортировать эти функции.

from_protobuf Приведение двоичного столбца к структуре и to_protobuf приведение столбца структуры к двоичному файлу. Необходимо указать реестр схем, указанный аргументом options , или файл дескриптора, определенный descFilePath аргументом.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

В следующих примерах показана обработка двоичных записей protobuf и from_protobuf() преобразование структуры Spark SQL в двоичный protobuf с помощью to_protobuf().

Использование protobuf с реестром схем Confluent

Azure Databricks поддерживает использование реестра схем Confluent для определения Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Проверка подлинности во внешнем реестре схем Confluent

Чтобы выполнить проверку подлинности во внешнем реестре схем Confluent, обновите параметры реестра схем, чтобы включить учетные данные проверки подлинности и ключи API.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Использование файлов truststore и хранилища ключей в томах каталога Unity

В Databricks Runtime 14.3 LTS и более поздних версиях можно использовать файлы доверия и хранилища ключей в томах каталога Unity для проверки подлинности в реестре схем Confluent. Обновите параметры реестра схем в соответствии со следующим примером:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Использование Protobuf с дескрипторным файлом

Вы также можете ссылаться на файл дескриптора protobuf, доступный для вычислительного кластера. Убедитесь, что у вас есть правильные разрешения на чтение файла в зависимости от его расположения.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Поддерживаемые параметры в функциях Protobuf

Следующие параметры поддерживаются в функциях Protobuf.

  • режим. Определяет способ обработки ошибок при десериализации записей Protobuf. Ошибки могут быть вызваны различными типами неправильных записей, включая несоответствие между фактической схемой записи и ожидаемой схемой, предоставленной в from_protobuf().
    • Значения:
      • FAILFAST(по умолчанию): ошибка возникает при обнаружении неправильно сформированной записи, и задача завершается сбоем.
      • PERMISSIVE: значение NULL возвращается для неправильно сформированных записей. Используйте этот параметр тщательно, так как это может привести к удалению множества записей. Это полезно, если небольшая доля записей в источнике неверны.
  • recursive.fields.max.depth: добавляет поддержку рекурсивных полей. Схемы SQL Spark не поддерживают рекурсивные поля. Если этот параметр не указан, рекурсивные поля не допускаются. Чтобы поддерживать рекурсивные поля в Protobufs, их необходимо расширить до указанной глубины.
    • Значения:

      • -1 (по умолчанию): не допускаются рекурсивные поля.

      • 0. Рекурсивные поля удаляются.

      • 1. Разрешает один уровень рекурсии.

      • [2-10]: укажите пороговое значение для нескольких рекурсий до 10 уровней.

        Задание значения больше 0 позволяет рекурсивным полям, расширяя вложенные поля на настроенную глубину. Значения, превышающие 10, не допускаются, чтобы избежать непреднамеренного создания очень больших схем. Если сообщение Protobuf имеет глубину за пределами настроенного ограничения, возвращаемая структурой Spark будет усечена после ограничения рекурсии.

    • Пример. Рассмотрим protobuf со следующим рекурсивным полем:

      message Person { string name = 1; Person friend = 2; }
      

      Ниже перечислены конечные схемы с различными значениями для этого параметра:

      • Для параметра задано значение 1: STRUCT<name: STRING>
      • Для параметра задано значение 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Для параметра задано значение 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json. Этот параметр позволяет преобразовать поля Protobuf Any в JSON. Эту функцию следует тщательно включить. Преобразование и обработка JSON неэффективны. Кроме того, строковое поле JSON теряет безопасность схемы Protobuf, что делает обработку нижестоящей подверженной ошибкам.
    • Значения:

      • False (по умолчанию): во время выполнения такие поля подстановочных знаков могут содержать произвольные сообщения Protobuf в виде двоичных данных. По умолчанию такие поля обрабатываются как обычное сообщение Protobuf. В нем есть два поля со схемой (STRUCT<type_url: STRING, value: BINARY>). По умолчанию двоичное value поле не интерпретируется каким-либо образом. Но двоичные данные могут оказаться не удобными на практике для работы в некоторых приложениях.
      • True. При задании этого значения значение true позволяет преобразовывать Any поля в строки JSON во время выполнения. С помощью этого параметра двоичный файл анализируется, и сообщение Protobuf десериализировано в строку JSON.
    • Пример. Рассмотрим два типа Protobuf, определенные следующим образом:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Если этот параметр включен, схема from_protobuf("col", messageName ="ProtoWithAny") будет: STRUCT<event_name: STRING, details: STRING>

      Во время выполнения, если details поле содержит Person сообщение Protobuf, возвращаемое значение выглядит следующим образом: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')

    • Требования.

      • Определения всех возможных типов Protobuf, используемых в Any полях, должны быть доступны в файле дескриптора Protobuf, переданном в from_protobuf().
      • Если Any Protobuf не найден, это приведет к ошибке для этой записи.
      • Эта функция в настоящее время не поддерживается в реестре схем.
  • emit.default.values: включает поля отрисовки с нулевыми значениями при десериализации Protobuf в структуру Spark. Этот параметр следует использовать с разреженным способом. Обычно не рекомендуется зависеть от таких более тонких различий в семантике.
    • Значения

      • False (по умолчанию): если поле пусто в сериализованном Protobuf, результирующее поле структуры Spark по умолчанию равно NULL. Проще не включить этот параметр и рассматривать null как значение по умолчанию.
      • True: если этот параметр включен, такие поля заполняются соответствующими значениями по умолчанию.
    • Пример. Рассмотрим следующий protobuf с протобуфом, построенным следующим образом Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Если для этого параметра задано значение False, то после вызова from_protobuf() spark будут все значения NULL: {"name": null, "age": null, "middle_name": "", "salary": null} Несмотря на то, что в двух полях (age и middle_name) заданы значения, Protobuf не включает их в формат провода, так как они являются значениями по умолчанию.
      • Если для этого параметра задано значение True, то после вызова from_protobuf() будет: {"name": "", "age": 0, "middle_name": "", "salary": null} Поле salary остается пустым, так как оно явно объявлено optional , и оно не задано во входной записи.
  • enums.as.ints: если включено, поля перечисления в Protobuf отображаются как целые поля в Spark.
    • Значения

      • False (по умолчанию)
      • True. При включении поля перечисления в Protobuf отображаются как целые поля в Spark.
    • Пример. Рассмотрим следующий protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Учитывая сообщение Protobuf, например Person(job = ENGINEER):

      • Если этот параметр отключен, соответствующая структуру Spark будет {"job": "ENGINEER"}отключена.
      • Если этот параметр включен, соответствующая структуру Spark будет иметь значение {"job": 1}.

      Обратите внимание, что схема этих полей отличается в каждом случае (целое число, а не строка по умолчанию). Такое изменение может повлиять на схему подчиненных таблиц.

Параметры реестра схем

При использовании реестра схем с функциями Protobuf используются следующие параметры реестра схем.

  • schema.registry.subject
    • Обязательное поле
    • Указывает тему схемы в реестре схем, например "client-event"
  • schema.registry.address
    • Обязательное поле
    • URL-адрес реестра схем, например https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Необязательно
    • По умолчанию: <NONE>.
    • Запись реестра схемы для субъекта может содержать несколько определений Protobuf, как и один proto файл. Если этот параметр не указан, первый protobuf используется для схемы. Укажите имя сообщения Protobuf, если он не первый в записи. Например, рассмотрим запись с двумя определениями Protobuf: Person и Location в этом порядке. Если поток соответствует "Location", а не "Person", задайте для этого параметра значение Location (или полное имя, включая пакет com.example.protos.Location).
  • schema.registry.schema.evolution.mode
    • Значение по умолчанию: "перезапустить".
    • Поддерживаемые режимы:
      • "перезапустить"
      • "нет"
    • Этот параметр задает режим эволюции схемы для from_protobuf(). В начале запроса Spark записывает последний идентификатор схемы для заданной темы. Это определяет схему для from_protobuf(). После запуска запроса новая схема может быть опубликована в реестре схем. Когда новый идентификатор схемы заметен во входящей записи, он указывает на изменение схемы. Этот параметр определяет, как обрабатывается такое изменение схемы:
      • перезапуск (по умолчанию): активирует UnknownFieldException при появлении нового идентификатора схемы. Это завершает запрос. Databricks рекомендует настроить задания для перезапуска при сбое запроса для получения изменений схемы.
      • нет: изменения идентификатора схемы игнорируются. Записи с более новым идентификатором схемы анализируются с той же схемой, которая наблюдалась в начале запроса. Ожидается, что новые определения Protobuf будут обратно совместимы, а новые поля игнорируются.
  • confluent.schema.registry.<schema-registy-client-option>
    • Необязательно
    • Реестр схем подключается к реестру схемы Confluent с помощью клиента реестра схем Confluent. Любые параметры конфигурации, поддерживаемые клиентом, можно указать с префиксом confluent.schema.registry. Например, следующие два параметра предоставляют учетные данные проверки подлинности USER_INFO:
      • "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
      • "confluent.schema.registry.basic.auth.user.info": " : <SECRET>"<KEY>