Lesen und Schreiben von Avro-Streamingdaten

Apache Avro ist ein in der Welt des Streamings häufig verwendetes Datenserialisierungssystem. Eine typische Lösung besteht darin, Daten im Avro-Format in Apache Kafka und Metadaten in der Schemaregistrierung von Confluent abzulegen und dann Abfragen mit einem Streamingframework auszuführen, das sowohl mit Kafka als auch mit der Schemaregistrierung eine Verbindung herstellt.

Azure Databricks unterstützt die Funktionenfrom_avro und to_avro zum Erstellen von Streamingpipelines mit Avro-Daten in Kafka und Metadaten in der Schemaregistrierung. Die Funktion to_avro codiert eine Spalte als Binärdaten im Avro-Format und from_avro decodiert Avro-Binärdaten in eine Spalte. Beide Funktionen transformieren eine Spalte in eine andere Spalte, und der SQL-Eingabe-/Ausgabedatentyp kann ein komplexer oder ein primitiver Typ sein.

Hinweis

Für die Funktionen from_avro und to_avro gilt Folgendes:

  • Sie sind in Python, Scala und Java verfügbar.
  • Sie können an SQL-Funktionen in Batch- und Streamingabfragen übergeben werden.

Weiter Informationen finden Sie unter Avro-Dateidatenquelle.

Beispiel für ein manuell angegebenes Schema

Ähnlich wie from_json und to_json können Sie from_avro und to_avro mit jeder binären Spalte verwenden. Sie können das Avro-Schema wie im folgenden Beispiel manuell angeben:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

Beispiel für JSON-Formatschema

Sie können auch ein Schema als JSON-Zeichenfolge angeben. Ist /tmp/user.avsc beispielsweise

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Sie können eine JSON-Zeichenfolge erstellen:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Dann verwenden Sie das Schema in from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Beispiel mit Schemaregistrierung

Wenn Ihr Cluster über einen Schemaregistrierungsdienst verfügt, kann from_avro damit arbeiten, sodass Sie das Avro-Schema nicht manuell angeben müssen.

Das folgende Beispiel veranschaulicht das Lesen des Kafka-Themas „t“. Dabei wird davon ausgegangen, dass der Schlüssel und der Wert bereits in der Schemaregistrierung als Thema „t-key“ und „t-value“ vom Typ STRING und INT registriert sind:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Bei to_avro entspricht das Avro-Standardausgabeschema aus folgenden Gründen möglicherweise nicht dem Schema des Zielsubjekts im Schemaregistrierungsdienst:

  • Die Zuordnung des Spark SQL-Typs zum Avro-Schema erfolgt nicht 1:1. Weitere Informationen finden Sie unter Unterstützte Typen für die Konvertierung von Spark SQL in Avro.
  • Wenn das konvertierte Avro-Ausgabeschema vom Datensatztyp ist, lautet der Datensatzname topLevelRecord, und es ist standardmäßig kein Namespace verfügbar.

Wenn das Standardausgabeschema von to_avro dem Schema des Zielsubjekts entspricht, haben Sie folgende Möglichkeit:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Andernfalls müssen Sie das Schema des Zielsubjekts in der Funktion to_avro angeben:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Authentifizieren bei einer externen Schemaregistrierung von Confluent

Ab Databricks Runtime 12.2 LTS und höher können Sie sich bei einer externen Confluent-Schemaregistrierung authentifizieren. Die folgenden Beispiele veranschaulichen, wie Sie Ihre Schemaregistrierungsoptionen mit Authentifizierungsanmeldeinformationen und API-Schlüsseln konfigurieren.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(col("key"), lit("t-key"), schema_registry_address, schema_registry_options).alias("key"),
    to_avro(col("value"), lit("t-value"), schema_registry_address, schema_registry_options, avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Verwenden von Truststore- und Keystoredateien in Unity-Katalogvolumes

In Databricks Runtime 14.3 LTS und höher können Sie Truststore- und Keystoredateien in Unity-Katalogvolumes verwenden, um sich bei einer Confluent-Schemaregistrierung zu authentifizieren. Aktualisieren Sie die Konfiguration im vorherigen Beispiel mithilfe der folgenden Syntax:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Verwenden des Schemaentwicklungsmodus mit from_avro

In Databricks Runtime 14.2 und höher können Sie den Schemaentwicklungsmodus mit from_avro verwenden. Das Aktivieren des Schemaentwicklungsmodus bewirkt, dass der Auftrag nach dem Erkennen der Schemaentwicklung die Ausnahme UnknownFieldException auslöst. Databricks empfiehlt das Konfigurieren von Aufträgen mit dem Schemaentwicklungsmodus, um diese bei einem Taskfehler automatisch neu zu starten. Weitere Informationen finden Sie unter Konfigurieren von strukturierten Streamingaufträgen zum Neustarten von Streamingabfragen bei einem Fehler.

Die Schemaentwicklung ist nützlich, wenn Sie erwarten, dass sich das Schema Ihrer Quelldaten im Laufe der Zeit weiterentwickelt und alle Felder aus der Datenquelle erfasst werden. Wenn Ihre Abfragen bereits explizit angeben, welche Felder in Ihrer Datenquelle abgefragt werden sollen, werden hinzugefügte Felder unabhängig von der Schemaentwicklung ignoriert.

Verwenden Sie die Option avroSchemaEvolutionMode, um die Schemaentwicklung zu aktivieren. In der folgenden Tabelle werden die Optionen für den Schemaentwicklungsmodus beschrieben:

Option Verhalten
none Default. Ignoriert die Schemaentwicklung, und der Auftrag wird fortgesetzt.
restart Löst beim Erkennen der Schemaentwicklung die Ausnahme UnknownFieldException aus. Erfordert einen Auftragsneustart.

Hinweis

Sie können diese Konfiguration zwischen Streamingaufträgen ändern und denselben Prüfpunkt wiederverwenden. Das Deaktivieren der Schemaentwicklung kann dazu führen, dass Spalten gelöscht werden.

Konfigurieren des Parsemodus

Sie können den Parsemodus konfigurieren, um zu bestimmen, ob Fehler auftreten oder NULL-Datensätze ausgegeben werden sollen, wenn der Schemaentwicklungsmodus deaktiviert ist und das Schema nicht abwärtskompatibel weiterentwickelt wird. Mit den Standardeinstellungen schlägt from_avro fehl, wenn inkompatible Schemaänderungen beobachtet werden.

Verwenden Sie die Option mode, um den Parsemodus anzugeben. In der folgenden Tabelle werden die Optionen für den Parsemodus beschrieben:

Option Verhalten
FAILFAST Default. Ein Parsingfehler löst die Ausnahme SparkException mit einem errorClass-Wert von MALFORMED_AVRO_MESSAGE aus.
PERMISSIVE Ein Parsingfehler wird ignoriert, und ein NULL-Datensatz wird ausgegeben.

Hinweis

Wenn die Schemaentwicklung aktiviert ist, löst FAILFAST nur Ausnahmen aus, wenn ein Datensatz beschädigt ist.

Beispiel für die Verwendung des Schemaentwicklungsmodus und die Einstellung des Parsemodus

Das folgende Beispiel veranschaulicht das Aktivieren der Schemaentwicklung und das Festlegen des FAILFAST-Parsemodus mit einer Confluent-Schemaregistrierung:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)