Share via


Ler e escrever dados do Avro de transmissão em fluxo

Apache Avro é um sistema de serialização de dados comumente usado no mundo do streaming. Uma solução típica é colocar dados no formato Avro no Apache Kafka, metadados no Registro de Esquema Confluente e, em seguida, executar consultas com uma estrutura de streaming que se conecta ao Kafka e ao Registro de Esquema.

O Azure Databricks dá suporte às from_avro funções e to_avro para criar pipelines de streaming com dados Avro no Kafka e metadados no Registro de Esquema. A função to_avro codifica uma coluna como binária no formato Avro e from_avro decodifica dados binários Avro em uma coluna. Ambas as funções transformam uma coluna em outra coluna, e o tipo de dados SQL de entrada/saída pode ser um tipo complexo ou um tipo primitivo.

Nota

O from_avro e to_avro funções:

  • Estão disponíveis em Python, Scala e Java.
  • Pode ser passado para funções SQL em consultas em lote e streaming.

Consulte também a fonte de dados do arquivo Avro.

Exemplo de esquema especificado manualmente

Semelhante ao from_json e to_json, você pode usar from_avro e to_avro com qualquer coluna binária. Você pode especificar o esquema Avro manualmente, como no exemplo a seguir:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

Exemplo de jsonFormatSchema

Você também pode especificar um esquema como uma cadeia de caracteres JSON. Por exemplo, se /tmp/user.avsc for:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Você pode criar uma cadeia de caracteres JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Em seguida, use o esquema em from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Exemplo com o Registro de Esquema

Se o cluster tiver um serviço de Registro de Esquema, from_avro poderá trabalhar com ele para que você não precise especificar o esquema Avro manualmente.

O exemplo a seguir demonstra a leitura de um tópico "t" de Kafka, supondo que a chave e o valor já estejam registrados no Registro de Esquema como sujeitos "t-key" e "t-value" dos tipos STRING e INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Para to_avro, o esquema Avro de saída padrão pode não corresponder ao esquema do assunto de destino no serviço Registro de Esquema pelos seguintes motivos:

  • O mapeamento do tipo Spark SQL para o esquema Avro não é um-para-um. Consulte Tipos suportados para conversão do Spark SQL -> Avro.
  • Se o esquema Avro de saída convertido for do tipo de registro, o nome do registro será topLevelRecord e não haverá namespace por padrão.

Se o esquema de saída padrão do to_avro corresponder ao esquema do assunto de destino, você poderá fazer o seguinte:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Caso contrário, você deve fornecer o esquema do assunto de destino na to_avro função:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Autenticar em um registro de esquema confluente externo

No Databricks Runtime 12.2 LTS e superior, você pode autenticar em um Registro de Esquema Confluent externo. Os exemplos a seguir demonstram como configurar suas opções de registro de esquema para incluir credenciais de autenticação e chaves de API.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Use arquivos truststore e keystore em volumes do Unity Catalog

No Databricks Runtime 14.3 LTS e superior, você pode usar arquivos truststore e keystore nos volumes do Unity Catalog para autenticar em um Registro de Esquema Confluent. Atualize a configuração no exemplo anterior usando a seguinte sintaxe:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Use o modo de evolução de esquema com from_avro

No Databricks Runtime 14.2 e superior, você pode usar o modo de evolução de esquema com from_avro. Habilitar o modo de evolução do esquema faz com que o trabalho lance um UnknownFieldException depois de detetar a evolução do esquema. O Databricks recomenda configurar trabalhos com o modo de evolução de esquema para reiniciar automaticamente em caso de falha de tarefa. Consulte Configurar trabalhos de streaming estruturado para reiniciar consultas de streaming em caso de falha.

A evolução do esquema é útil se você espera que o esquema dos dados de origem evolua ao longo do tempo e ingira todos os campos da fonte de dados. Se suas consultas já especificarem explicitamente quais campos consultar em sua fonte de dados, os campos adicionados serão ignorados, independentemente da evolução do esquema.

Use a opção para habilitar a evolução do avroSchemaEvolutionMode esquema. A tabela a seguir descreve as opções para o modo de evolução do esquema:

Opção Comportamento
none Inadimplência. Ignora a evolução do esquema e o trabalho continua.
restart Lança um UnknownFieldException ao detetar a evolução do esquema. Requer um reinício do trabalho.

Nota

Você pode alterar essa configuração entre trabalhos de streaming e reutilizar o mesmo ponto de verificação. A desativação da evolução do esquema pode resultar em colunas descartadas.

Configurar o modo de análise

Você pode configurar o modo de análise para determinar se deseja falhar ou emitir registros nulos quando o modo de evolução do esquema estiver desativado e o esquema evoluir de forma não compatível com versões anteriores. Com as configurações padrão, from_avro falha quando observa alterações de esquema incompatíveis.

Use a opção para especificar o mode modo de análise. A tabela a seguir descreve a opção para o modo de análise:

Opção Comportamento
FAILFAST Inadimplência. Um erro de análise lança um SparkException com um errorClass de MALFORMED_AVRO_MESSAGE.
PERMISSIVE Um erro de análise é ignorado e um registro nulo é emitido.

Nota

Com a evolução do esquema habilitada, FAILFAST só lança exceções se um registro estiver corrompido.

Exemplo usando a evolução do esquema e definindo o modo de análise

O exemplo a seguir demonstra como habilitar a evolução do esquema e especificar FAILFAST o modo de análise com um Registro de Esquema Confluente:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)