Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Apache Avro — это часто используемая система сериализации данных для их потоковой передачи. Типичным решением является размещение данных в формате Avro в Apache Kafka, а метаданных в реестре схемы Confluent и выполнение запросов с помощью платформы потоковой передачи, подключенной к Kafka и реестру схемы.
Azure Databricks поддерживает from_avro и to_avroфункции для создания конвейеров потоковой передачи с данными Avro в Kafka и метаданных в реестре схем. Функция to_avro преобразует столбец в двоичный формат Avro, а from_avro декодирует двоичные данные Avro в столбец. Обе функции преобразуют один столбец в другой, а входные и выходные данные SQL могут быть представлены как сложным, так и простым типом.
Примечание.
Функции from_avro и to_avro:
- Доступны в Python, Scala и Java.
- Могут передаваться в функции SQL как в пакетных, так и в потоковых запросах.
См. также Источники данных – файлы Avro.
Пример схемы вручную
Аналогично from_json и to_json, можно использовать from_avro и to_avro с любым двоичным столбцом. Схему Avro можно указать вручную, как показано в следующем примере:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Пример jsonFormatSchema
Схему также можно указать в виде строки JSON. Например, если /tmp/user.avsc — это
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "favorite_color", "type": ["string", "null"] }
]
}
Вы можете создать строку JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Затем используйте схему в from_avro:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Пример с реестром схемы
Если в кластере есть служба реестра схем, можно работать с ней, from_avro чтобы не указывать схему Avro вручную.
В следующем примере показано чтение раздела Kafka "t", при условии, что ключ и значение уже зарегистрированы в реестре схем в качестве субъектов "t-key" и "t-value" типов STRING и INT:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))
Для to_avro выходная схема Avro по умолчанию может не соответствовать схеме целевого субъекта в службе реестра схемы по следующим причинам:
- Тип Spark SQL и схема Avro не сопоставлялись"один к одному". См. раздел Поддерживаемые типы для Spark SQL —> преобразование Avro.
- Если преобразованная исходящая схема Avro представлена записью с именем
topLevelRecordи при этом отсутствует пространство имен по умолчанию.
Если у to_avro выходная схема по умолчанию соответствует схеме целевого субъекта, можно выполнить следующие действия:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
В противном случае необходимо указать схему целевого субъекта в функции to_avro:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Проверка подлинности во внешнем реестре схем Confluent
В Databricks Runtime 12.2 LTS и более поздних версиях можно пройти проверку подлинности во внешнем реестре схем Confluent. В следующих примерах показано, как настроить параметры реестра схем для включения учетных данных проверки подлинности и ключей API.
язык программирования Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Питон
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Использование файлов truststore и хранилища ключей в томах каталога Unity
В Databricks Runtime 14.3 LTS и более поздних версиях можно использовать файлы доверия и хранилища ключей в томах каталога Unity для проверки подлинности в реестре схем Confluent. Обновите конфигурацию в предыдущем примере с помощью следующего синтаксиса:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Использование режима эволюции схемы с from_avro
В Databricks Runtime 14.2 и более поздних версиях можно использовать режим эволюции схемы с from_avro. Включение режима эволюции схемы приводит к возникновению UnknownFieldException задания после обнаружения эволюции схемы. Databricks рекомендует настраивать задания с режимом эволюции схемы для автоматического перезапуска при сбое задачи. Сведения о структурированной потоковой передаче см. в разделе "Рекомендации по рабочей среде".
Эволюция схемы полезна, если вы ожидаете, что схема исходных данных будет развиваться с течением времени и принимает все поля из источника данных. Если запросы уже явно указывают поля для запроса в источнике данных, добавленные поля игнорируются независимо от эволюции схемы.
avroSchemaEvolutionMode Используйте этот параметр, чтобы включить эволюцию схемы. В следующей таблице описаны параметры режима эволюции схемы:
| Вариант | Поведение |
|---|---|
none |
По умолчанию. Игнорирует эволюцию схемы и задание продолжается. |
restart |
UnknownFieldException Создает исключение при обнаружении эволюции схемы. Требуется перезапуск задания. |
Примечание.
Эту конфигурацию можно изменить между заданиями потоковой передачи и повторно использовать одну и ту же контрольную точку. Отключение эволюции схемы может привести к удаленным столбцам.
Настройка режима синтаксического анализа
Вы можете настроить режим синтаксического анализа, чтобы определить, требуется ли сбой или выводить пустые записи при отключении режима эволюции схемы, а схема развивается в обратном режиме совместимости. При использовании параметров по умолчанию происходит сбой при from_avro наблюдении несовместимых изменений схемы.
mode Используйте параметр для указания режима синтаксического анализа. В следующей таблице описывается параметр для режима синтаксического анализа:
| Вариант | Поведение |
|---|---|
FAILFAST |
По умолчанию. Ошибка синтаксического анализа создает SparkException исключение с помощью функции errorClassMALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Ошибка синтаксического анализа игнорируется, и создается пустая запись. |
Примечание.
При включенной эволюции схемы создается только исключение, FAILFAST если запись повреждена.
Пример использования режима эволюции схемы и настройки режима синтаксического анализа
В следующем примере показано включение эволюции схемы и указание FAILFAST режима синтаксического анализа с помощью реестра схем Confluent:
язык программирования Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))
Питон
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)