Lire et écrire des données Avro en streaming

Apache Avro est un système de sérialisation de données couramment utilisé dans le monde de la diffusion en continu. Une solution classique consiste à placer les données de format Avro dans Apache Kafka, les métadonnées dans Confluent Schema Registry, puis à exécuter des requêtes avec une infrastructure de diffusion en continu qui se connecte tant à Kafka qu’à Confluent Schema Registry.

Azure Databricks prend en charge les fonctionsfrom_avro et to_avro pour créer des pipelines de diffusion en continu avec les données Avro dans Kafka, et les métadonnées dans Confluent Schema Registry. La fonction to_avro encode une colonne en tant que binaire au format Avro, et from_avro décode les données binaires Avro dans une colonne. Les deux fonctions transforment une colonne en une autre, et le type de données SQL d’entrée/sortie peut être un type complexe ou primitif.

Notes

Les fonctions from_avro et to_avro :

  • Sont disponibles en Python, Scala et Java.
  • Peuvent être passées à des fonctions SQL dans des requêtes de traitement par lot et de diffusion en continu.

Consultez également source de données de fichier Avro.

Exemple de schéma spécifié manuellement

Comme pour from_json et to_json, vous pouvez utiliser from_avro et to_avro avec n’importe quelle colonne binaire. Vous pouvez spécifier manuellement le schéma Avro, comme dans l’exemple suivant :

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

Exemple de jsonFormatSchema

Vous pouvez également spécifier un schéma sous la forme d’une chaîne JSON. Par exemple, si /tmp/user.avsc est :

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Vous pouvez créer une chaîne JSON :

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Utilisez ensuite le schéma dans from_avro :

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Exemple avec Confluent Schema Registry

Si votre cluster a un service Confluent Schema Registry, from_avro peut l’utiliser pour vous éviter de devoir spécifier le schéma Avro manuellement.

L’exemple suivant montre la lecture d’une rubrique Kafka « t » en supposant que la clé et la valeur sont déjà inscrites dans le registre de schémas comme sujets « t-key » et « t-value » de types STRING et INT :

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Pour to_avro, il se peut que le schéma Avro de sortie par défaut ne corresponde pas au schéma du sujet cible dans le service Confluent Schema Registry pour les raisons suivantes :

  • Le mappage d’un type Spark SQL à un schéma Avro n’est pas de un à un. Consultez Types pris en charge pour la conversion Spark SQL -> Avro.
  • Si le schéma Avro de sortie converti est de type enregistrement, le nom de l’enregistrement est topLevelRecord et il n’y a pas d’espace de noms par défaut.

Si le schéma de sortie par défaut de to_avro correspond au schéma du sujet cible, vous pouvez procéder comme suit :

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Sinon, vous devez fournir le schéma du sujet cible dans la fonction to_avro :

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

S’authentifier auprès d’un registre de schémas Confluent externe

Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez vous authentifier auprès d’un registre de schémas Confluent externe. Les exemples suivants montrent comment configurer les options du registre de schémas pour inclure des clés API et des informations d’identification pour l’authentification.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(col("key"), lit("t-key"), schema_registry_address, schema_registry_options).alias("key"),
    to_avro(col("value"), lit("t-value"), schema_registry_address, schema_registry_options, avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Utiliser les fichiers truststore et keystore (magasin de clés) dans les volumes Unity Catalog

Dans Databricks Runtime 14.3 LTS et versions ultérieures, vous pouvez utiliser des fichiers truststore et keystore dans les volumes Unity Catalog pour vous authentifier auprès de Confluent Schema Registry. Mettez à jour la configuration dans l’exemple précédent à l’aide de la syntaxe suivante :

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Utiliser le mode d’évolution du schéma avec from_avro

Dans Databricks Runtime 14.2 et les versions ultérieures, vous pouvez utiliser le mode d’évolution du schéma avec from_avro. L’activation du mode d’évolution du schéma entraîne la levée d’un UnknownFieldException après la détection de l’évolution du schéma. Databricks recommande de configurer des travaux avec le mode d’évolution du schéma pour redémarrer automatiquement en cas d’échec de la tâche. Consultez Configurer des travaux de Structured Streaming pour redémarrer des requêtes de diffusion en continu en cas d’échec.

L’évolution du schéma est utile si vous vous attendez à ce que le schéma de vos données sources évolue au fil du temps et ingère tous les champs de votre source de données. Si vos requêtes spécifient déjà explicitement les champs à interroger dans votre source de données, les champs ajoutés sont ignorés indépendamment de l’évolution du schéma.

Utilisez l’option avroSchemaEvolutionMode pour activer l’évolution du schéma. Le tableau suivant décrit les options pour le mode d’évolution du schéma :

Option Comportement
none Par défaut. Ignore l’évolution du schéma et le travail continue.
restart Lève un UnknownFieldException lors de la détection de l’évolution du schéma. Nécessite un redémarrage du travail.

Remarque

Vous pouvez modifier cette configuration entre les travaux de diffusion en continu et réutiliser le même point de contrôle. La désactivation de l’évolution du schéma peut entraîner la suppression de colonnes.

Configurer le mode d’analyse

Vous pouvez configurer le mode d’analyse pour déterminer si vous souhaitez échouer ou émettre des enregistrements nuls lorsque le mode d’évolution du schéma est désactivé et que le schéma évolue de manière non descendante. Avec les paramètres par défaut, from_avro échoue lorsqu’il observe des modifications de schéma incompatibles.

Utilisez l’option mode pour spécifier le mode d’analyse. Le tableau suivant décrit l’option pour le mode d’analyse :

Option Comportement
FAILFAST Par défaut. Une erreur d’analyse lève un SparkException dans un errorClass de MALFORMED_AVRO_MESSAGE.
PERMISSIVE Une erreur d’analyse est ignorée et un enregistrement nul est émis.

Remarque

Avec l’évolution du schéma activée, FAILFAST lève uniquement des exceptions si un enregistrement est endommagé.

Exemple utilisant l’évolution du schéma et la définition du mode d’analyse

L’exemple suivant illustre l’activation de l’évolution du schéma et la spécification du mode d’analyse FAILFAST avec un registre de schémas Confluent :

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)