Lire et écrire des données Avro en streaming
Apache Avro est un système de sérialisation de données couramment utilisé dans le monde de la diffusion en continu. Une solution classique consiste à placer les données de format Avro dans Apache Kafka, les métadonnées dans Confluent Schema Registry, puis à exécuter des requêtes avec une infrastructure de diffusion en continu qui se connecte tant à Kafka qu’à Confluent Schema Registry.
Azure Databricks prend en charge le from_avro
et les to_avro
fonctions pour créer des pipelines de diffusion en continu avec les données Avro dans Kafka, et les métadonnées dans Confluent Schema Registry. La fonction to_avro
encode une colonne en tant que binaire au format Avro, et from_avro
décode les données binaires Avro dans une colonne. Les deux fonctions transforment une colonne en une autre, et le type de données SQL d’entrée/sortie peut être un type complexe ou primitif.
Notes
Les fonctions from_avro
et to_avro
:
- Sont disponibles en Python, Scala et Java.
- Peuvent être passées à des fonctions SQL dans des requêtes de traitement par lot et de diffusion en continu.
Consultez également source de données de fichier Avro.
Exemple de schéma spécifié manuellement
Comme pour from_json et to_json, vous pouvez utiliser from_avro
et to_avro
avec n’importe quelle colonne binaire. Vous pouvez spécifier manuellement le schéma Avro, comme dans l’exemple suivant :
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Exemple de jsonFormatSchema
Vous pouvez également spécifier un schéma sous la forme d’une chaîne JSON. Par exemple, si /tmp/user.avsc
est :
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Vous pouvez créer une chaîne JSON :
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Utilisez ensuite le schéma dans from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Exemple avec Confluent Schema Registry
Si votre cluster a un service Confluent Schema Registry, from_avro
peut l’utiliser pour vous éviter de devoir spécifier le schéma Avro manuellement.
L’exemple suivant montre la lecture d’une rubrique Kafka « t » en supposant que la clé et la valeur sont déjà inscrites dans le registre de schémas comme sujets « t-key » et « t-value » de types STRING
et INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Pour to_avro
, il se peut que le schéma Avro de sortie par défaut ne corresponde pas au schéma du sujet cible dans le service Confluent Schema Registry pour les raisons suivantes :
- Le mappage d’un type Spark SQL à un schéma Avro n’est pas de un à un. Consultez Types pris en charge pour la conversion Spark SQL -> Avro.
- Si le schéma Avro de sortie converti est de type enregistrement, le nom de l’enregistrement est
topLevelRecord
et il n’y a pas d’espace de noms par défaut.
Si le schéma de sortie par défaut de to_avro
correspond au schéma du sujet cible, vous pouvez procéder comme suit :
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Sinon, vous devez fournir le schéma du sujet cible dans la fonction to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
S’authentifier auprès d’un registre de schémas Confluent externe
Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez vous authentifier auprès d’un registre de schémas Confluent externe. Les exemples suivants montrent comment configurer les options du registre de schémas pour inclure des clés API et des informations d’identification pour l’authentification.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Utiliser les fichiers truststore et keystore (magasin de clés) dans les volumes Unity Catalog
Dans Databricks Runtime 14.3 LTS et versions ultérieures, vous pouvez utiliser des fichiers truststore et keystore dans les volumes Unity Catalog pour vous authentifier auprès de Confluent Schema Registry. Mettez à jour la configuration dans l’exemple précédent à l’aide de la syntaxe suivante :
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Utiliser le mode d’évolution du schéma avec from_avro
Dans Databricks Runtime 14.2 et les versions ultérieures, vous pouvez utiliser le mode d’évolution du schéma avec from_avro
. L’activation du mode d’évolution du schéma entraîne la levée d’un UnknownFieldException
après la détection de l’évolution du schéma. Databricks recommande de configurer des travaux avec le mode d’évolution du schéma pour redémarrer automatiquement en cas d’échec de la tâche. Consultez Considérations relatives à la production pour flux structuré.
L’évolution du schéma est utile si vous vous attendez à ce que le schéma de vos données sources évolue au fil du temps et ingère tous les champs de votre source de données. Si vos requêtes spécifient déjà explicitement les champs à interroger dans votre source de données, les champs ajoutés sont ignorés indépendamment de l’évolution du schéma.
Utilisez l’option avroSchemaEvolutionMode
pour activer l’évolution du schéma. Le tableau suivant décrit les options pour le mode d’évolution du schéma :
Option | Comportement |
---|---|
none |
Par défaut. Ignore l’évolution du schéma et le travail continue. |
restart |
Lève un UnknownFieldException lors de la détection de l’évolution du schéma. Nécessite un redémarrage du travail. |
Remarque
Vous pouvez modifier cette configuration entre les travaux de diffusion en continu et réutiliser le même point de contrôle. La désactivation de l’évolution du schéma peut entraîner la suppression de colonnes.
Configurer le mode d’analyse
Vous pouvez configurer le mode d’analyse pour déterminer si vous souhaitez échouer ou émettre des enregistrements nuls lorsque le mode d’évolution du schéma est désactivé et que le schéma évolue de manière non descendante. Avec les paramètres par défaut, from_avro
échoue lorsqu’il observe des modifications de schéma incompatibles.
Utilisez l’option mode
pour spécifier le mode d’analyse. Le tableau suivant décrit l’option pour le mode d’analyse :
Option | Comportement |
---|---|
FAILFAST |
Par défaut. Une erreur d’analyse lève un SparkException dans un errorClass de MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Une erreur d’analyse est ignorée et un enregistrement nul est émis. |
Remarque
Avec l’évolution du schéma activée, FAILFAST
lève uniquement des exceptions si un enregistrement est endommagé.
Exemple utilisant l’évolution du schéma et la définition du mode d’analyse
L’exemple suivant illustre l’activation de l’évolution du schéma et la spécification du mode d’analyse FAILFAST
avec un registre de schémas Confluent :
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)