Bagikan melalui


Membaca dan menulis data Avro streaming

Apache Avro adalah sistem serialisasi data yang umum digunakan di dunia streaming. Solusi khas adalah menempatkan data dalam format Avro di Apache Kafka, metadata di Confluent Schema Registry, dan kemudian menjalankan kueri dengan kerangka kerja streaming yang terhubung ke Kafka dan Registri Skema.

Azure Databricks mendukung from_avro fungsito_avro dan untuk membangun alur streaming dengan data Avro di Kafka dan metadata di Registri Skema. Fungsi to_avro ini mengkodekan kolom sebagai biner dalam format Avro dan from_avro memecahkan kode data biner Avro menjadi kolom. Kedua fungsi mengubah satu kolom ke kolom lain, dan input/output tipe data SQL dapat menjadi tipe yang kompleks atau tipe primitif.

Catatan

from_avro dan to_avro fungsi:

  • Tersedia dalam Python, Scala, dan Java.
  • Dapat diteruskan ke fungsi SQL dalam kueri batch dan streaming.

Lihat juga sumber data file Avro.

Contoh skema yang ditentukan secara manual

Mirip dengan from_json dan to_json, Anda dapat menggunakan from_avro dan to_avro dengan kolom biner apa pun. Anda dapat menentukan skema Avro secara manual, seperti dalam contoh berikut:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

contoh jsonFormatSchema

Anda juga dapat menentukan skema sebagai string JSON. Misalnya, jika /tmp/user.avsc adalah:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Anda dapat membuat string JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Kemudian gunakan skema di from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Contoh dengan Registri Skema

Jika kluster Anda memiliki layanan Registri Skema, from_avro dapat bekerja dengannya sehingga Anda tidak perlu menentukan skema Avro secara manual.

Contoh berikut menunjukkan membaca topik Kafka "t", dengan asumsi kunci dan nilai sudah terdaftar di Schema Registry sebagai subjek "t-key" dan "t-value" dari jenis STRING dan INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Untuk to_avro, skema Avro output default mungkin tidak cocok dengan skema subjek target di layanan Registri Skema karena alasan berikut:

  • Pemetaan dari jenis Spark SQL ke skema Avro bukan satu-ke-satu. Lihat Tipe yang didukung untuk Spark SQL ->konversi Avro.
  • Jika skema keluaran Avro yang dikonversi adalah tipe rekaman, nama rekaman adalah topLevelRecord dan tidak ada namespace secara default.

Jika skema to_avro output default cocok dengan skema subjek target, Anda dapat melakukan hal berikut:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Jika tidak, Anda harus memberikan skema subjek target dalam to_avro fungsi:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Mengautentikasi ke Registri Skema Confluent eksternal

Di Databricks Runtime 12.2 LTS ke atas, Anda dapat mengautentikasi ke Registri Skema Confluent eksternal. Contoh berikut menunjukkan cara mengonfigurasi opsi registri skema Anda untuk menyertakan kredensial autentikasi dan kunci API.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Menggunakan file truststore dan keystore dalam volume Katalog Unity

Di Databricks Runtime 14.3 LTS ke atas, Anda dapat menggunakan file truststore dan keystore dalam volume Unity Catalog untuk mengautentikasi ke Confluent Schema Registry. Perbarui konfigurasi dalam contoh sebelumnya menggunakan sintaks berikut:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Gunakan mode evolusi skema dengan from_avro

Dalam Databricks Runtime 14.2 ke atas, Anda dapat menggunakan mode evolusi skema dengan from_avro. Mengaktifkan mode evolusi skema menyebabkan pekerjaan melempar UnknownFieldException setelah mendeteksi evolusi skema. Databricks merekomendasikan untuk mengonfigurasi pekerjaan dengan mode evolusi skema untuk memulai ulang secara otomatis pada kegagalan tugas. Lihat Mengonfigurasi pekerjaan Streaming Terstruktur untuk memulai ulang kueri streaming saat gagal.

Evolusi skema berguna jika Anda mengharapkan skema data sumber Anda berevolusi dari waktu ke waktu dan menyerap semua bidang dari sumber data Anda. Jika kueri Anda sudah secara eksplisit menentukan bidang mana yang akan dikueri di sumber data Anda, bidang yang ditambahkan diabaikan terlepas dari evolusi skema.

avroSchemaEvolutionMode Gunakan opsi untuk mengaktifkan evolusi skema. Tabel berikut menjelaskan opsi untuk mode evolusi skema:

Opsi Perilaku
none Default. Mengabaikan evolusi skema dan pekerjaan berlanjut.
restart UnknownFieldException Melempar saat mendeteksi evolusi skema. Memerlukan pekerjaan dimulai ulang.

Catatan

Anda dapat mengubah konfigurasi ini antara pekerjaan streaming dan menggunakan kembali titik pemeriksaan yang sama. Menonaktifkan evolusi skema dapat mengakibatkan kolom yang dihilangkan.

Mengonfigurasi mode penguraian

Anda dapat mengonfigurasi mode penguraian untuk menentukan apakah Anda ingin gagal atau mengeluarkan rekaman null saat mode evolusi skema dinonaktifkan dan skema berevolusi dengan cara yang kompatibel non-mundur. Dengan pengaturan default, from_avro gagal saat mengamati perubahan skema yang tidak kompatibel.

mode Gunakan opsi untuk menentukan mode penguraian. Tabel berikut ini menjelaskan opsi untuk mode penguraian:

Opsi Perilaku
FAILFAST Default. Kesalahan penguraian melempar dengan SparkExceptionerrorClass .MALFORMED_AVRO_MESSAGE
PERMISSIVE Kesalahan penguraian diabaikan dan rekaman null dikeluarkan.

Catatan

Dengan evolusi skema diaktifkan, FAILFAST hanya melemparkan pengecualian jika rekaman rusak.

Contoh menggunakan evolusi skema dan mengatur mode penguraian

Contoh berikut menunjukkan mengaktifkan evolusi skema dan menentukan mode penguraian FAILFAST dengan Confluent Schema Registry:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)