Akışla aktarılan Avro verilerini okuma ve yazma
Apache Avro , akış dünyasında yaygın olarak kullanılan bir veri serileştirme sistemidir. Tipik bir çözüm, Apache Kafka'da Avro biçiminde veri, Confluent Schema Registry'de meta veriler koymak ve ardından hem Kafka hem de Schema Registry'ye bağlanan bir akış çerçevesiyle sorgu çalıştırmaktır.
Azure Databricks, Kafka'da from_avro
Avro verileri ve Schema Registry'deki meta veriler ile akış işlem hatları oluşturmak için ve to_avro
işlevlerini destekler. İşlev to_avro
, bir sütunu Avro biçiminde ikili olarak kodlar ve from_avro
Avro ikili verilerini bir sütuna kodlar. Her iki işlev de bir sütunu başka bir sütuna dönüştürür ve giriş/çıkış SQL veri türü karmaşık bir tür veya ilkel bir tür olabilir.
Not
from_avro
ve to_avro
işlevleri:
- Python, Scala ve Java'da kullanılabilir.
- Hem toplu hem de akış sorgularında SQL işlevlerine geçirilebilir.
Ayrıca bkz. Avro dosya veri kaynağı.
El ile belirtilen şema örneği
from_json ve to_json benzer şekilde, herhangi bir ikili sütunla ve to_avro
kullanabilirsinizfrom_avro
. Aşağıdaki örnekte olduğu gibi Avro şemasını el ile belirtebilirsiniz:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema örneği
Bir şemayı JSON dizesi olarak da belirtebilirsiniz. Örneğin, şu durumdaysa /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Bir JSON dizesi oluşturabilirsiniz:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Ardından içindeki from_avro
şemayı kullanın:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Schema Registry örneği
Kümenizin Şema Kayıt Defteri hizmeti varsa, from_avro
Avro şemasını el ile belirtmeniz gerekmemesi için bu hizmetle çalışabilir.
Aşağıdaki örnekte, anahtarın ve değerin Schema Registry'de türlerin "t-key" ve "t-değeri" konuları olarak zaten kaydedildiği varsayılarak "t" kafka konusunun STRING
okunması gösterilmektedir:INT
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
için to_avro
, varsayılan çıkış Avro şeması aşağıdaki nedenlerle Schema Registry hizmetindeki hedef konunun şemasıyla eşleşmeyebilir:
- Spark SQL türünden Avro şemasına eşleme bire bir değildir. Bkz. Spark SQL için desteklenen türler -> Avro dönüştürme.
- Dönüştürülen çıkış Avro şeması kayıt türündeyse, kayıt adı olur
topLevelRecord
ve varsayılan olarak ad alanı yoktur.
Varsayılan çıkış şeması to_avro
hedef konunun şemasıyla eşleşiyorsa aşağıdakileri yapabilirsiniz:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Aksi takdirde, işlevinde hedef konunun şemasını to_avro
sağlamanız gerekir:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Dış Confluent Schema Registry'de kimlik doğrulaması
Databricks Runtime 12.2 LTS ve üzeri bir dış Confluent Schema Registry'de kimlik doğrulaması yapabilirsiniz. Aşağıdaki örneklerde, şema kayıt defteri seçeneklerinizi kimlik doğrulaması kimlik bilgilerini ve API anahtarlarını içerecek şekilde yapılandırma gösterilmektedir.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Unity Kataloğu birimlerinde truststore ve keystore dosyalarını kullanma
Databricks Runtime 14.3 LTS ve üzerinde, Confluent Schema Registry'de kimlik doğrulaması yapmak için Unity Kataloğu birimlerindeki truststore ve keystore dosyalarını kullanabilirsiniz. Aşağıdaki söz dizimini kullanarak önceki örnekteki yapılandırmayı güncelleştirin:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Ile şema evrim modunu kullanma from_avro
Databricks Runtime 14.2 ve üzeri sürümleriyle şema evrim modunu from_avro
kullanabilirsiniz. Şema evrim modunun etkinleştirilmesi, şema evrimi algıladıktan sonra işin bir UnknownFieldException
oluşturmasına neden olur. Databricks, görevlerin görev hatasında otomatik olarak yeniden başlatılması için şema evrimi moduyla işleri yapılandırmanızı önerir. Bkz . Yapılandırılmış Akış için üretimle ilgili dikkat edilmesi gerekenler.
Şema evrimi, kaynak verilerinizin şemasının zaman içinde gelişmesini ve veri kaynağınızdan tüm alanları almayı bekliyorsanız kullanışlıdır. Sorgularınız zaten veri kaynağınızda hangi alanların sorgulanacağını açıkça belirtiyorsa, eklenen alanlar şema evrimine bakılmaksızın yoksayılır.
Şema evrimini avroSchemaEvolutionMode
etkinleştirmek için seçeneğini kullanın. Aşağıdaki tabloda şema evrim modu seçenekleri açıklanmaktadır:
Seçenek | Davranış |
---|---|
none |
Varsayılan. Şema evrimini yoksayar ve iş devam eder. |
restart |
Şema evrimi algılanırken bir UnknownFieldException oluşturur. bir işin yeniden başlatılmasını gerektirir. |
Not
Bu yapılandırmayı akış işleri arasında değiştirebilir ve aynı denetim noktasını yeniden kullanabilirsiniz. Şema evrimin devre dışı bırakılması, bırakılan sütunlara neden olabilir.
Ayrıştırma modunu yapılandırma
Şema geliştirme modu devre dışı bırakıldığında ve şema geriye dönük uyumlu olmayan bir şekilde geliştikçe başarısız olmak mı yoksa null kayıt yaymak mı istediğinizi belirlemek için ayrıştırma modunu yapılandırabilirsiniz. Varsayılan ayarlarda, from_avro
uyumlu olmayan şema değişiklikleri gözlemlediğinde başarısız olur.
mode
Ayrıştırma modunu belirtmek için seçeneğini kullanın. Aşağıdaki tabloda ayrıştırma modu seçeneği açıklanmaktadır:
Seçenek | Davranış |
---|---|
FAILFAST |
Varsayılan. Ayrıştırma hatası ile bir errorClass MALFORMED_AVRO_MESSAGE oluştururSparkException . |
PERMISSIVE |
Ayrıştırma hatası yoksayılır ve null kayıt yayılır. |
Not
Şema evrimi etkinleştirildiğinde, FAILFAST
yalnızca bir kayıt bozulursa özel durumlar oluşturur.
Şema evrimi ve ayrıştırma modunu ayarlama örneği
Aşağıdaki örnek, şema evrimini etkinleştirmeyi ve Confluent Schema Registry ile ayrıştırma modunu belirtmeyi FAILFAST
gösterir:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)