Aracılığıyla paylaş


Akışla aktarılan Avro verilerini okuma ve yazma

Apache Avro , akış dünyasında yaygın olarak kullanılan bir veri serileştirme sistemidir. Tipik bir çözüm, Apache Kafka'da Avro biçiminde veri, Confluent Schema Registryiçindeki meta verileri koymak ve ardından hem Kafka hem de Schema Registry'ye bağlanan bir akış çerçevesiyle sorgu çalıştırmaktır.

Azure Databricks, Kafka'da Avro verileri ve Schema Registry'de meta veriler ile akış işlem hatları oluşturmak ve işlevlerini destekler. İşlev to_avro bir sütunu Avro biçiminde ikili olarak kodlar ve from_avro Avro ikili verilerinin kodunu bir sütuna çözer. Her iki işlev de bir sütunu başka bir sütuna dönüştürür ve giriş/çıkış SQL veri türü karmaşık bir tür veya ilkel bir tür olabilir.

Not

from_avro ve to_avro işlevleri:

  • Python, Scala ve Java'da kullanılabilir.
  • Hem toplu hem de akış sorgularında SQL işlevlerine geçirilebilir.

Ayrıca bkz. Avro dosya veri kaynağı.

El ile belirtilen şema örneği

from_json ve to_jsonbenzer şekilde, herhangi bir ikili sütunla from_avro ve to_avro kullanabilirsiniz. Aşağıdaki örnekte olduğu gibi Avro şemasını el ile belirtebilirsiniz:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema örneği

Bir şemayı JSON dizesi olarak da belirtebilirsiniz. Örneğin, şu durumdaysa /tmp/user.avsc :

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    { "name": "name", "type": "string" },
    { "name": "favorite_color", "type": ["string", "null"] }
  ]
}

Bir JSON dizesi oluşturabilirsiniz:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Ardından from_avroiçinde şemayı kullanın:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Schema Registry örneği

Kümenizin Şema Kayıt Defteri hizmeti varsa, from_avro Avro şemasını el ile belirtmeniz gerekmemesi için bu hizmetle çalışabilir.

Aşağıdaki örnekte, anahtarın ve değerin STRING ve INTtürlerinde "t-key" ve "t-value" konuları olarak Schema Registry'ye zaten kayıtlı olduğu varsayılarak "t" kafka konusunun okunması gösterilmektedir:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
    from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))

to_avroiçin, varsayılan çıkış Avro şeması aşağıdaki nedenlerle Schema Registry hizmetindeki hedef konunun şemasıyla eşleşmeyebilir:

to_avro varsayılan çıkış şeması hedef konunun şemasıyla eşleşiyorsa aşağıdakileri yapabilirsiniz:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Aksi takdirde, to_avro işlevinde hedef konunun şemasını sağlamanız gerekir:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Dış Confluent Schema Registry'de Kimlik Doğrulaması

Databricks Runtime 12.2 LTS ve üzeri sürümlerde, bir dış Confluent Schema Registry'ye bağlanarak kimlik doğrulayabilirsiniz. Aşağıdaki örneklerde, şema kayıt defteri seçeneklerinizi kimlik bilgilerini ve API anahtarlarını içerecek şekilde yapılandırmanın nasıl yapılacağı gösterilmektedir.

Scala programlama dili

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
    from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Piton

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Unity Kataloğu birimlerinde truststore ve keystore dosyalarını kullanma

Databricks Runtime 14.3 LTS ve üzerinde, Confluent Schema Registry'de kimlik doğrulaması yapmak için Unity Kataloğu birimlerindeki truststore ve keystore dosyalarını kullanabilirsiniz. Önceki örnekteki yapılandırmayı aşağıdaki sözdizimini kullanarak güncelleyin.

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

from_avro ile şema geliştirme modunu kullanma

Databricks Runtime 14.2 ve üzerinde şema evrim modunu from_avroile kullanabilirsiniz. Şema evrim modunun etkinleştirilmesi, şema evrimini algıladıktan sonra işin bir UnknownFieldException oluşturmasına neden olur. Databricks, görevlerin görev hatasında otomatik olarak yeniden başlatılması için şema evrimi moduyla işleri yapılandırmanızı önerir. Bkz . Yapılandırılmış Akış için üretimle ilgili dikkat edilmesi gerekenler.

Şema evrimi, kaynak verilerinizin şemasının zaman içinde gelişmesini ve veri kaynağınızdan tüm alanları almayı bekliyorsanız kullanışlıdır. Sorgularınız zaten veri kaynağınızda hangi alanların sorgulanacağını açıkça belirtiyorsa, eklenen alanlar şema evrimine bakılmaksızın yoksayılır.

Şema evrimini etkinleştirmek için avroSchemaEvolutionMode seçeneğini kullanın. Aşağıdaki tabloda şema evrim modu seçenekleri açıklanmaktadır:

Seçenek Davranış
none Varsayılan. Şema evrimini yoksayar ve iş devam eder.
restart Şema evrimi algılanırken UnknownFieldException atar. bir işin yeniden başlatılmasını gerektirir.

Not

Bu yapılandırmayı akış işleri arasında değiştirebilir ve aynı denetim noktasını yeniden kullanabilirsiniz. Şema evriminin devre dışı bırakılması, silinen sütunlara yol açabilir.

Ayrıştırma modunu yapılandırma

Şema geliştirme modu devre dışı bırakıldığında ve şema geriye dönük uyumlu olmayan bir şekilde geliştikçe başarısız olmak mı yoksa null kayıt yaymak mı istediğinizi belirlemek için ayrıştırma modunu yapılandırabilirsiniz. Varsayılan ayarlarla, from_avro uyumsuz şema değişiklikleri gözlemlediğinde başarısız olur.

mode Ayrıştırma modunu belirtmek için seçeneğini kullanın. Aşağıdaki tabloda ayrıştırma modu seçeneği açıklanmaktadır:

Seçenek Davranış
FAILFAST Varsayılan. Ayrıştırma hatası ile bir SparkExceptionerrorClassoluştururMALFORMED_AVRO_MESSAGE.
PERMISSIVE Ayrıştırma hatası yoksayılır ve null kayıt yayılır.

Not

Şema evrimi etkinleştirildiğinde, FAILFAST yalnızca bir kayıt bozulursa özel durumlar oluşturur.

Şema evrimi ve ayrıştırma modunu ayarlama örneği

Aşağıdaki örnek, şema evrimini etkinleştirmeyi ve Confluent Schema Registry ile FAILFAST ayrıştırma modunu belirtmeyi gösterir:

Scala programlama dili

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            data = $"key",
            subject = "t-key",
            schemaRegistryAddress = schemaRegistryAddr,
            options = schemaRegistryOptions.asJava).as("key"))

Piton

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)