Lesen und Schreiben von Avro-Streamingdaten
Apache Avro ist ein in der Welt des Streamings häufig verwendetes Datenserialisierungssystem. Eine typische Lösung besteht darin, Daten im Avro-Format in Apache Kafka und Metadaten in der Schemaregistrierung von Confluent abzulegen und dann Abfragen mit einem Streamingframework auszuführen, das sowohl mit Kafka als auch mit der Schemaregistrierung eine Verbindung herstellt.
Azure Databricks unterstützt die Funktionen from_avro
und to_avro
zum Erstellen von Streamingpipelines mit Avro-Daten in Kafka und Metadaten in der Schemaregistrierung. Die Funktion to_avro
codiert eine Spalte als Binärdaten im Avro-Format und from_avro
decodiert Avro-Binärdaten in eine Spalte. Beide Funktionen transformieren eine Spalte in eine andere Spalte, und der SQL-Eingabe-/Ausgabedatentyp kann ein komplexer oder ein primitiver Typ sein.
Hinweis
Für die Funktionen from_avro
und to_avro
gilt Folgendes:
- Sie sind in Python, Scala und Java verfügbar.
- Sie können an SQL-Funktionen in Batch- und Streamingabfragen übergeben werden.
Weiter Informationen finden Sie unter Avro-Dateidatenquelle.
Beispiel für ein manuell angegebenes Schema
Ähnlich wie from_json und to_json können Sie from_avro
und to_avro
mit jeder binären Spalte verwenden. Sie können das Avro-Schema wie im folgenden Beispiel manuell angeben:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Beispiel für JSON-Formatschema
Sie können auch ein Schema als JSON-Zeichenfolge angeben. Ist /tmp/user.avsc
beispielsweise
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Sie können eine JSON-Zeichenfolge erstellen:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Dann verwenden Sie das Schema in from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Beispiel mit Schemaregistrierung
Wenn Ihr Cluster über einen Schemaregistrierungsdienst verfügt, kann from_avro
damit arbeiten, sodass Sie das Avro-Schema nicht manuell angeben müssen.
Das folgende Beispiel veranschaulicht das Lesen des Kafka-Themas „t“. Dabei wird davon ausgegangen, dass der Schlüssel und der Wert bereits in der Schemaregistrierung als Thema „t-key“ und „t-value“ vom Typ STRING
und INT
registriert sind:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Bei to_avro
entspricht das Avro-Standardausgabeschema aus folgenden Gründen möglicherweise nicht dem Schema des Zielsubjekts im Schemaregistrierungsdienst:
- Die Zuordnung des Spark SQL-Typs zum Avro-Schema erfolgt nicht 1:1. Weitere Informationen finden Sie unter Unterstützte Typen für die Konvertierung von Spark SQL in Avro.
- Wenn das konvertierte Avro-Ausgabeschema vom Datensatztyp ist, lautet der Datensatzname
topLevelRecord
, und es ist standardmäßig kein Namespace verfügbar.
Wenn das Standardausgabeschema von to_avro
dem Schema des Zielsubjekts entspricht, haben Sie folgende Möglichkeit:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Andernfalls müssen Sie das Schema des Zielsubjekts in der Funktion to_avro
angeben:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Authentifizieren bei einer externen Schemaregistrierung von Confluent
Ab Databricks Runtime 12.2 LTS und höher können Sie sich bei einer externen Confluent-Schemaregistrierung authentifizieren. Die folgenden Beispiele veranschaulichen, wie Sie Ihre Schemaregistrierungsoptionen mit Authentifizierungsanmeldeinformationen und API-Schlüsseln konfigurieren.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Verwenden von Truststore- und Keystoredateien in Unity-Katalogvolumes
In Databricks Runtime 14.3 LTS und höher können Sie Truststore- und Keystoredateien in Unity-Katalogvolumes verwenden, um sich bei einer Confluent-Schemaregistrierung zu authentifizieren. Aktualisieren Sie die Konfiguration im vorherigen Beispiel mithilfe der folgenden Syntax:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Verwenden des Schemaentwicklungsmodus mit from_avro
In Databricks Runtime 14.2 und höher können Sie den Schemaentwicklungsmodus mit from_avro
verwenden. Das Aktivieren des Schemaentwicklungsmodus bewirkt, dass der Auftrag nach dem Erkennen der Schemaentwicklung die Ausnahme UnknownFieldException
auslöst. Databricks empfiehlt das Konfigurieren von Aufträgen mit dem Schemaentwicklungsmodus, um diese bei einem Taskfehler automatisch neu zu starten. Weitere Informationen finden Sie unter Produktionsüberlegungen für strukturiertes Streaming.
Die Schemaentwicklung ist nützlich, wenn Sie erwarten, dass sich das Schema Ihrer Quelldaten im Laufe der Zeit weiterentwickelt und alle Felder aus der Datenquelle erfasst werden. Wenn Ihre Abfragen bereits explizit angeben, welche Felder in Ihrer Datenquelle abgefragt werden sollen, werden hinzugefügte Felder unabhängig von der Schemaentwicklung ignoriert.
Verwenden Sie die Option avroSchemaEvolutionMode
, um die Schemaentwicklung zu aktivieren. In der folgenden Tabelle werden die Optionen für den Schemaentwicklungsmodus beschrieben:
Option | Verhalten |
---|---|
none |
Default. Ignoriert die Schemaentwicklung, und der Auftrag wird fortgesetzt. |
restart |
Löst beim Erkennen der Schemaentwicklung die Ausnahme UnknownFieldException aus. Erfordert einen Auftragsneustart. |
Hinweis
Sie können diese Konfiguration zwischen Streamingaufträgen ändern und denselben Prüfpunkt wiederverwenden. Das Deaktivieren der Schemaentwicklung kann dazu führen, dass Spalten gelöscht werden.
Konfigurieren des Parsemodus
Sie können den Parsemodus konfigurieren, um zu bestimmen, ob Fehler auftreten oder NULL-Datensätze ausgegeben werden sollen, wenn der Schemaentwicklungsmodus deaktiviert ist und das Schema nicht abwärtskompatibel weiterentwickelt wird. Mit den Standardeinstellungen schlägt from_avro
fehl, wenn inkompatible Schemaänderungen beobachtet werden.
Verwenden Sie die Option mode
, um den Parsemodus anzugeben. In der folgenden Tabelle werden die Optionen für den Parsemodus beschrieben:
Option | Verhalten |
---|---|
FAILFAST |
Default. Ein Parsingfehler löst die Ausnahme SparkException mit einem errorClass -Wert von MALFORMED_AVRO_MESSAGE aus. |
PERMISSIVE |
Ein Parsingfehler wird ignoriert, und ein NULL-Datensatz wird ausgegeben. |
Hinweis
Wenn die Schemaentwicklung aktiviert ist, löst FAILFAST
nur Ausnahmen aus, wenn ein Datensatz beschädigt ist.
Beispiel für die Verwendung des Schemaentwicklungsmodus und die Einstellung des Parsemodus
Das folgende Beispiel veranschaulicht das Aktivieren der Schemaentwicklung und das Festlegen des FAILFAST
-Parsemodus mit einer Confluent-Schemaregistrierung:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)