Aracılığıyla paylaş


Akışla aktarılan Avro verilerini okuma ve yazma

Apache Avro , akış dünyasında yaygın olarak kullanılan bir veri serileştirme sistemidir. Tipik bir çözüm, Apache Kafka'da Avro biçiminde veri, Confluent Schema Registry'de meta veriler koymak ve ardından hem Kafka hem de Schema Registry'ye bağlanan bir akış çerçevesiyle sorgu çalıştırmaktır.

Azure Databricks, Kafka'da from_avro Avro verileri ve Schema Registry'deki meta veriler ile akış işlem hatları oluşturmak için ve to_avro işlevlerini destekler. İşlev to_avro , bir sütunu Avro biçiminde ikili olarak kodlar ve from_avro Avro ikili verilerini bir sütuna kodlar. Her iki işlev de bir sütunu başka bir sütuna dönüştürür ve giriş/çıkış SQL veri türü karmaşık bir tür veya ilkel bir tür olabilir.

Not

from_avro ve to_avro işlevleri:

  • Python, Scala ve Java'da kullanılabilir.
  • Hem toplu hem de akış sorgularında SQL işlevlerine geçirilebilir.

Ayrıca bkz. Avro dosya veri kaynağı.

El ile belirtilen şema örneği

from_json ve to_json benzer şekilde, herhangi bir ikili sütunla ve to_avro kullanabilirsinizfrom_avro. Aşağıdaki örnekte olduğu gibi Avro şemasını el ile belirtebilirsiniz:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema örneği

Bir şemayı JSON dizesi olarak da belirtebilirsiniz. Örneğin, şu durumdaysa /tmp/user.avsc :

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Bir JSON dizesi oluşturabilirsiniz:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Ardından içindeki from_avroşemayı kullanın:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Schema Registry örneği

Kümenizin Şema Kayıt Defteri hizmeti varsa, from_avro Avro şemasını el ile belirtmeniz gerekmemesi için bu hizmetle çalışabilir.

Aşağıdaki örnekte, anahtarın ve değerin Schema Registry'de türlerin "t-key" ve "t-değeri" konuları olarak zaten kaydedildiği varsayılarak "t" kafka konusunun STRING okunması gösterilmektedir:INT

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

için to_avro, varsayılan çıkış Avro şeması aşağıdaki nedenlerle Schema Registry hizmetindeki hedef konunun şemasıyla eşleşmeyebilir:

Varsayılan çıkış şeması to_avro hedef konunun şemasıyla eşleşiyorsa aşağıdakileri yapabilirsiniz:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Aksi takdirde, işlevinde hedef konunun şemasını to_avro sağlamanız gerekir:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Dış Confluent Schema Registry'de kimlik doğrulaması

Databricks Runtime 12.2 LTS ve üzeri bir dış Confluent Schema Registry'de kimlik doğrulaması yapabilirsiniz. Aşağıdaki örneklerde, şema kayıt defteri seçeneklerinizi kimlik doğrulaması kimlik bilgilerini ve API anahtarlarını içerecek şekilde yapılandırma gösterilmektedir.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Unity Kataloğu birimlerinde truststore ve keystore dosyalarını kullanma

Databricks Runtime 14.3 LTS ve üzerinde, Confluent Schema Registry'de kimlik doğrulaması yapmak için Unity Kataloğu birimlerindeki truststore ve keystore dosyalarını kullanabilirsiniz. Aşağıdaki söz dizimini kullanarak önceki örnekteki yapılandırmayı güncelleştirin:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Ile şema evrim modunu kullanma from_avro

Databricks Runtime 14.2 ve üzeri sürümleriyle şema evrim modunu from_avrokullanabilirsiniz. Şema evrim modunun etkinleştirilmesi, şema evrimi algıladıktan sonra işin bir UnknownFieldException oluşturmasına neden olur. Databricks, görevlerin görev hatasında otomatik olarak yeniden başlatılması için şema evrimi moduyla işleri yapılandırmanızı önerir. Bkz . Yapılandırılmış Akış için üretimle ilgili dikkat edilmesi gerekenler.

Şema evrimi, kaynak verilerinizin şemasının zaman içinde gelişmesini ve veri kaynağınızdan tüm alanları almayı bekliyorsanız kullanışlıdır. Sorgularınız zaten veri kaynağınızda hangi alanların sorgulanacağını açıkça belirtiyorsa, eklenen alanlar şema evrimine bakılmaksızın yoksayılır.

Şema evrimini avroSchemaEvolutionMode etkinleştirmek için seçeneğini kullanın. Aşağıdaki tabloda şema evrim modu seçenekleri açıklanmaktadır:

Seçenek Davranış
none Varsayılan. Şema evrimini yoksayar ve iş devam eder.
restart Şema evrimi algılanırken bir UnknownFieldException oluşturur. bir işin yeniden başlatılmasını gerektirir.

Not

Bu yapılandırmayı akış işleri arasında değiştirebilir ve aynı denetim noktasını yeniden kullanabilirsiniz. Şema evrimin devre dışı bırakılması, bırakılan sütunlara neden olabilir.

Ayrıştırma modunu yapılandırma

Şema geliştirme modu devre dışı bırakıldığında ve şema geriye dönük uyumlu olmayan bir şekilde geliştikçe başarısız olmak mı yoksa null kayıt yaymak mı istediğinizi belirlemek için ayrıştırma modunu yapılandırabilirsiniz. Varsayılan ayarlarda, from_avro uyumlu olmayan şema değişiklikleri gözlemlediğinde başarısız olur.

mode Ayrıştırma modunu belirtmek için seçeneğini kullanın. Aşağıdaki tabloda ayrıştırma modu seçeneği açıklanmaktadır:

Seçenek Davranış
FAILFAST Varsayılan. Ayrıştırma hatası ile bir errorClass MALFORMED_AVRO_MESSAGEoluştururSparkException.
PERMISSIVE Ayrıştırma hatası yoksayılır ve null kayıt yayılır.

Not

Şema evrimi etkinleştirildiğinde, FAILFAST yalnızca bir kayıt bozulursa özel durumlar oluşturur.

Şema evrimi ve ayrıştırma modunu ayarlama örneği

Aşağıdaki örnek, şema evrimini etkinleştirmeyi ve Confluent Schema Registry ile ayrıştırma modunu belirtmeyi FAILFAST gösterir:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)