Чтение и запись буферов протокола
Azure Databricks обеспечивает встроенную поддержку сериализации и десериализации между структурами Apache Spark и буферами протокола (protobuf). Поддержка Protobuf реализована как преобразователь кадра данных Apache Spark и может использоваться с структурированной потоковой передачей или для пакетных операций.
Десериализация и сериализация буферов протокола
В Databricks Runtime 12.2 LTS и более поздних версиях можно использовать from_protobuf
и to_protobuf
функции для сериализации и десериализации данных. Сериализация Protobuf обычно используется в рабочих нагрузках потоковой передачи.
Базовый синтаксис функций protobuf аналогичен функциям чтения и записи. Прежде чем использовать эти функции, необходимо импортировать эти функции.
from_protobuf
Приведение двоичного столбца к структуре и to_protobuf
приведение столбца структуры к двоичному файлу. Необходимо указать реестр схем, указанный аргументом options
, или файл дескриптора, определенный descFilePath
аргументом.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
В следующих примерах показана обработка двоичных записей protobuf и from_protobuf()
преобразование структуры Spark SQL в двоичный protobuf с помощью to_protobuf()
.
Использование protobuf с реестром схем Confluent
Azure Databricks поддерживает использование реестра схем Confluent для определения Protobuf.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Проверка подлинности во внешнем реестре схем Confluent
Чтобы выполнить проверку подлинности во внешнем реестре схем Confluent, обновите параметры реестра схем, чтобы включить учетные данные проверки подлинности и ключи API.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Использование файлов truststore и хранилища ключей в томах каталога Unity
В Databricks Runtime 14.3 LTS и более поздних версиях можно использовать файлы доверия и хранилища ключей в томах каталога Unity для проверки подлинности в реестре схем Confluent. Обновите параметры реестра схем в соответствии со следующим примером:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Использование Protobuf с дескрипторным файлом
Вы также можете ссылаться на файл дескриптора protobuf, доступный для вычислительного кластера. Убедитесь, что у вас есть правильные разрешения на чтение файла в зависимости от его расположения.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Поддерживаемые параметры в функциях Protobuf
Следующие параметры поддерживаются в функциях Protobuf.
- режим. Определяет способ обработки ошибок при десериализации записей Protobuf. Ошибки могут быть вызваны различными типами неправильных записей, включая несоответствие между фактической схемой записи и ожидаемой схемой, предоставленной в
from_protobuf()
.- Значения:
FAILFAST
(по умолчанию): ошибка возникает при обнаружении неправильно сформированной записи, и задача завершается сбоем.PERMISSIVE
: значение NULL возвращается для неправильно сформированных записей. Используйте этот параметр тщательно, так как это может привести к удалению множества записей. Это полезно, если небольшая доля записей в источнике неверны.
- Значения:
- recursive.fields.max.depth: добавляет поддержку рекурсивных полей. Схемы SQL Spark не поддерживают рекурсивные поля. Если этот параметр не указан, рекурсивные поля не допускаются. Чтобы поддерживать рекурсивные поля в Protobufs, их необходимо расширить до указанной глубины.
Значения:
-1 (по умолчанию): не допускаются рекурсивные поля.
0. Рекурсивные поля удаляются.
1. Разрешает один уровень рекурсии.
[2-10]: укажите пороговое значение для нескольких рекурсий до 10 уровней.
Задание значения больше 0 позволяет рекурсивным полям, расширяя вложенные поля на настроенную глубину. Значения, превышающие 10, не допускаются, чтобы избежать непреднамеренного создания очень больших схем. Если сообщение Protobuf имеет глубину за пределами настроенного ограничения, возвращаемая структурой Spark будет усечена после ограничения рекурсии.
Пример. Рассмотрим protobuf со следующим рекурсивным полем:
message Person { string name = 1; Person friend = 2; }
Ниже перечислены конечные схемы с различными значениями для этого параметра:
- Для параметра задано значение 1:
STRUCT<name: STRING>
- Для параметра задано значение 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Для параметра задано значение 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Для параметра задано значение 1:
- convert.any.fields.to.json. Этот параметр позволяет преобразовать поля Protobuf Any в JSON. Эту функцию следует тщательно включить. Преобразование и обработка JSON неэффективны. Кроме того, строковое поле JSON теряет безопасность схемы Protobuf, что делает обработку нижестоящей подверженной ошибкам.
Значения:
- False (по умолчанию): во время выполнения такие поля подстановочных знаков могут содержать произвольные сообщения Protobuf в виде двоичных данных. По умолчанию такие поля обрабатываются как обычное сообщение Protobuf. В нем есть два поля со схемой
(STRUCT<type_url: STRING, value: BINARY>)
. По умолчанию двоичноеvalue
поле не интерпретируется каким-либо образом. Но двоичные данные могут оказаться не удобными на практике для работы в некоторых приложениях. - True. При задании этого значения значение true позволяет преобразовывать
Any
поля в строки JSON во время выполнения. С помощью этого параметра двоичный файл анализируется, и сообщение Protobuf десериализировано в строку JSON.
- False (по умолчанию): во время выполнения такие поля подстановочных знаков могут содержать произвольные сообщения Protobuf в виде двоичных данных. По умолчанию такие поля обрабатываются как обычное сообщение Protobuf. В нем есть два поля со схемой
Пример. Рассмотрим два типа Protobuf, определенные следующим образом:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Если этот параметр включен, схема
from_protobuf("col", messageName ="ProtoWithAny")
будет:STRUCT<event_name: STRING, details: STRING>
Во время выполнения, если
details
поле содержитPerson
сообщение Protobuf, возвращаемое значение выглядит следующим образом:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
Требования.
- Определения всех возможных типов Protobuf, используемых в
Any
полях, должны быть доступны в файле дескриптора Protobuf, переданном вfrom_protobuf()
. - Если
Any
Protobuf не найден, это приведет к ошибке для этой записи. - Эта функция в настоящее время не поддерживается в реестре схем.
- Определения всех возможных типов Protobuf, используемых в
- emit.default.values: включает поля отрисовки с нулевыми значениями при десериализации Protobuf в структуру Spark. Этот параметр следует использовать с разреженным способом. Обычно не рекомендуется зависеть от таких более тонких различий в семантике.
Значения
- False (по умолчанию): если поле пусто в сериализованном Protobuf, результирующее поле структуры Spark по умолчанию равно NULL. Проще не включить этот параметр и рассматривать
null
как значение по умолчанию. - True: если этот параметр включен, такие поля заполняются соответствующими значениями по умолчанию.
- False (по умолчанию): если поле пусто в сериализованном Protobuf, результирующее поле структуры Spark по умолчанию равно NULL. Проще не включить этот параметр и рассматривать
Пример. Рассмотрим следующий protobuf с протобуфом, построенным следующим образом
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Если для этого параметра задано значение False, то после вызова
from_protobuf()
spark будут все значения NULL:{"name": null, "age": null, "middle_name": "", "salary": null}
Несмотря на то, что в двух полях (age
иmiddle_name
) заданы значения, Protobuf не включает их в формат провода, так как они являются значениями по умолчанию. - Если для этого параметра задано значение True, то после вызова
from_protobuf()
будет:{"name": "", "age": 0, "middle_name": "", "salary": null}
Полеsalary
остается пустым, так как оно явно объявленоoptional
, и оно не задано во входной записи.
- Если для этого параметра задано значение False, то после вызова
- enums.as.ints: если включено, поля перечисления в Protobuf отображаются как целые поля в Spark.
Значения
- False (по умолчанию)
- True. При включении поля перечисления в Protobuf отображаются как целые поля в Spark.
Пример. Рассмотрим следующий protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Учитывая сообщение Protobuf, например
Person(job = ENGINEER)
:- Если этот параметр отключен, соответствующая структуру Spark будет
{"job": "ENGINEER"}
отключена. - Если этот параметр включен, соответствующая структуру Spark будет иметь значение
{"job": 1}
.
Обратите внимание, что схема этих полей отличается в каждом случае (целое число, а не строка по умолчанию). Такое изменение может повлиять на схему подчиненных таблиц.
- Если этот параметр отключен, соответствующая структуру Spark будет
Параметры реестра схем
При использовании реестра схем с функциями Protobuf используются следующие параметры реестра схем.
- schema.registry.subject
- Обязательное поле
- Указывает тему схемы в реестре схем, например "client-event"
- schema.registry.address
- Обязательное поле
- URL-адрес реестра схем, например
https://schema-registry.example.com:8081
- schema.registry.protobuf.name
- Необязательно
- По умолчанию:
<NONE>
. - Запись реестра схемы для субъекта может содержать несколько определений Protobuf, как и один
proto
файл. Если этот параметр не указан, первый protobuf используется для схемы. Укажите имя сообщения Protobuf, если он не первый в записи. Например, рассмотрим запись с двумя определениями Protobuf: Person и Location в этом порядке. Если поток соответствует "Location", а не "Person", задайте для этого параметра значение Location (или полное имя, включая пакет com.example.protos.Location).
- schema.registry.schema.evolution.mode
- Значение по умолчанию: "перезапустить".
- Поддерживаемые режимы:
- "перезапустить"
- "нет"
- Этот параметр задает режим эволюции схемы для
from_protobuf()
. В начале запроса Spark записывает последний идентификатор схемы для заданной темы. Это определяет схему дляfrom_protobuf()
. После запуска запроса новая схема может быть опубликована в реестре схем. Когда новый идентификатор схемы заметен во входящей записи, он указывает на изменение схемы. Этот параметр определяет, как обрабатывается такое изменение схемы:- перезапуск (по умолчанию): активирует
UnknownFieldException
при появлении нового идентификатора схемы. Это завершает запрос. Databricks рекомендует настроить задания для перезапуска при сбое запроса для получения изменений схемы. - нет: изменения идентификатора схемы игнорируются. Записи с более новым идентификатором схемы анализируются с той же схемой, которая наблюдалась в начале запроса. Ожидается, что новые определения Protobuf будут обратно совместимы, а новые поля игнорируются.
- перезапуск (по умолчанию): активирует
- confluent.schema.registry.
<schema-registy-client-option>
- Необязательно
- Реестр схем подключается к реестру схемы Confluent с помощью клиента реестра схем Confluent. Любые параметры конфигурации, поддерживаемые клиентом, можно указать с префиксом confluent.schema.registry. Например, следующие два параметра предоставляют учетные данные проверки подлинности USER_INFO:
- "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
- "confluent.schema.registry.basic.auth.user.info": " :
<SECRET>
"<KEY>