Membaca dan menulis data Avro streaming
Apache Avro adalah sistem serialisasi data yang umum digunakan di dunia streaming. Solusi khas adalah menempatkan data dalam format Avro di Apache Kafka, metadata di Confluent Schema Registry, dan kemudian menjalankan kueri dengan kerangka kerja streaming yang terhubung ke Kafka dan Registri Skema.
Azure Databricks from_avro
mendukung fungsi dan to_avro
untuk membangun alur streaming dengan data Avro di Kafka dan metadata di Schema Registry. Fungsi to_avro
ini mengkodekan kolom sebagai biner dalam format Avro dan from_avro
memecahkan kode data biner Avro menjadi kolom. Kedua fungsi mengubah satu kolom ke kolom lain, dan input/output tipe data SQL dapat menjadi tipe yang kompleks atau tipe primitif.
Catatan
from_avro
dan to_avro
fungsi:
- Tersedia dalam Python, Scala, dan Java.
- Dapat diteruskan ke fungsi SQL dalam kueri batch dan streaming.
Lihat juga sumber data file Avro.
Contoh skema yang ditentukan secara manual
Mirip dengan from_json dan to_json, Anda dapat menggunakan from_avro
dan to_avro
dengan kolom biner apa pun. Anda dapat menentukan skema Avro secara manual, seperti dalam contoh berikut:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
contoh jsonFormatSchema
Anda juga dapat menentukan skema sebagai string JSON. Misalnya, jika /tmp/user.avsc
adalah:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Anda dapat membuat string JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Kemudian gunakan skema di from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Contoh dengan Registri Skema
Jika kluster Anda memiliki layanan Registri Skema, from_avro
dapat bekerja dengannya sehingga Anda tidak perlu menentukan skema Avro secara manual.
Contoh berikut menunjukkan membaca topik Kafka "t", dengan asumsi kunci dan nilai sudah terdaftar di Schema Registry sebagai subjek "t-key" dan "t-value" dari jenis STRING
dan INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Untuk to_avro
, skema Avro output default mungkin tidak cocok dengan skema subjek target di layanan Registri Skema karena alasan berikut:
- Pemetaan dari jenis Spark SQL ke skema Avro bukan satu-ke-satu. Lihat Tipe yang didukung untuk Spark SQL ->konversi Avro.
- Jika skema keluaran Avro yang dikonversi adalah tipe rekaman, nama rekaman adalah
topLevelRecord
dan tidak ada namespace secara default.
Jika skema to_avro
output default cocok dengan skema subjek target, Anda dapat melakukan hal berikut:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Jika tidak, Anda harus memberikan skema subjek target dalam to_avro
fungsi:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Mengautentikasi ke Registri Skema Confluent eksternal
Di Databricks Runtime 12.2 LTS ke atas, Anda dapat mengautentikasi ke Registri Skema Confluent eksternal. Contoh berikut menunjukkan cara mengonfigurasi opsi registri skema Anda untuk menyertakan kredensial autentikasi dan kunci API.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Menggunakan file truststore dan keystore dalam volume Katalog Unity
Di Databricks Runtime 14.3 LTS ke atas, Anda dapat menggunakan file truststore dan keystore dalam volume Unity Catalog untuk mengautentikasi ke Confluent Schema Registry. Perbarui konfigurasi dalam contoh sebelumnya menggunakan sintaks berikut:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Gunakan mode evolusi skema dengan from_avro
Dalam Databricks Runtime 14.2 ke atas, Anda dapat menggunakan mode evolusi skema dengan from_avro
. Mengaktifkan mode evolusi skema menyebabkan pekerjaan melempar UnknownFieldException
setelah mendeteksi evolusi skema. Databricks merekomendasikan untuk mengonfigurasi pekerjaan dengan mode evolusi skema untuk memulai ulang secara otomatis pada kegagalan tugas. Lihat Pertimbangan produksi untuk Streaming Terstruktur.
Evolusi skema berguna jika Anda mengharapkan skema data sumber Anda berevolusi dari waktu ke waktu dan menyerap semua bidang dari sumber data Anda. Jika kueri Anda sudah secara eksplisit menentukan bidang mana yang akan dikueri di sumber data Anda, bidang yang ditambahkan diabaikan terlepas dari evolusi skema.
avroSchemaEvolutionMode
Gunakan opsi untuk mengaktifkan evolusi skema. Tabel berikut menjelaskan opsi untuk mode evolusi skema:
Opsi | Perilaku |
---|---|
none |
Default. Mengabaikan evolusi skema dan pekerjaan berlanjut. |
restart |
UnknownFieldException Melempar saat mendeteksi evolusi skema. Memerlukan pekerjaan dimulai ulang. |
Catatan
Anda dapat mengubah konfigurasi ini antara pekerjaan streaming dan menggunakan kembali titik pemeriksaan yang sama. Menonaktifkan evolusi skema dapat mengakibatkan kolom yang dihilangkan.
Mengonfigurasi mode penguraian
Anda dapat mengonfigurasi mode penguraian untuk menentukan apakah Anda ingin gagal atau mengeluarkan rekaman null saat mode evolusi skema dinonaktifkan dan skema berevolusi dengan cara yang kompatibel non-mundur. Dengan pengaturan default, from_avro
gagal saat mengamati perubahan skema yang tidak kompatibel.
mode
Gunakan opsi untuk menentukan mode penguraian. Tabel berikut ini menjelaskan opsi untuk mode penguraian:
Opsi | Perilaku |
---|---|
FAILFAST |
Default. Kesalahan penguraian melempar dengan SparkException errorClass .MALFORMED_AVRO_MESSAGE |
PERMISSIVE |
Kesalahan penguraian diabaikan dan rekaman null dikeluarkan. |
Catatan
Dengan evolusi skema diaktifkan, FAILFAST
hanya melemparkan pengecualian jika rekaman rusak.
Contoh menggunakan evolusi skema dan mengatur mode penguraian
Contoh berikut menunjukkan mengaktifkan evolusi skema dan menentukan mode penguraian FAILFAST
dengan Confluent Schema Registry:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)