Avro-adatok streamelésének olvasási és írási műveletei
Az Apache Avro egy gyakran használt adat szerializálási rendszer a streamelési világban. Egy tipikus megoldás az adatok Avro formátumban történő elhelyezése Apache Kafkában, metaadatok a Confluent sémaregisztrációs adatbázisában, majd lekérdezések futtatása egy streamelési keretrendszerrel, amely a Kafkához és a Sémaregisztrációs adatbázishoz is csatlakozik.
Az Azure Databricks támogatja a from_avro
to_avro
streamelési folyamatok kafkai Avro-adatokkal és a schema registry metaadataival történő kiépítését. A függvény to_avro
binárisként kódol egy oszlopot Avro formátumban, és from_avro
az Avro bináris adatait egy oszlopba dekódolja. Mindkét függvény átalakítja az egyik oszlopot egy másik oszlopba, a bemeneti/kimeneti SQL-adattípus pedig összetett vagy primitív típus lehet.
Feljegyzés
Az from_avro
és to_avro
a függvények:
- A Pythonban, a Scalában és a Java-ban is elérhetők.
- Az SQL-függvények kötegelt és streamelési lekérdezésekben is átadhatók.
Lásd még az Avro-fájl adatforrását.
Példa manuálisan megadott sémára
A from_json és a to_json hasonlóan bármely bináris oszlopot használhatja from_avro
to_avro
. Az Avro-sémát manuálisan is megadhatja, ahogyan az alábbi példában is látható:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema példa
A sémát JSON-sztringként is megadhatja. Ha például /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
JSON-sztringet is létrehozhat:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Ezután használja a sémát a következő helyen from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Példa sémaregisztrációs adatbázissal
Ha a fürt sémaregisztrációs szolgáltatással rendelkezik, dolgozhat vele, from_avro
hogy ne kelljen manuálisan megadnia az Avro-sémát.
Az alábbi példa egy "t" Kafka-témakör olvasását mutatja be, feltéve, hogy a kulcs és az érték már regisztrálva van a Sémaregisztrációs adatbázisban a "t-key" és a "t-value" típusú STRING
tantárgyakként, és INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Előfordulhat to_avro
, hogy az alapértelmezett kimeneti Avro-séma nem felel meg a sémaregisztrációs szolgáltatás céltulajdonosának sémájának a következő okok miatt:
- A Spark SQL-típusról az Avro-sémára való leképezés nem egy az egyhez. Lásd a Spark SQL –> Avro átalakítás támogatott típusait.
- Ha a konvertált kimeneti Avro-séma rekordtípusú, a rekord neve az
topLevelRecord
, és alapértelmezés szerint nincs névtér.
Ha az alapértelmezett kimeneti séma to_avro
megegyezik a cél tárgyát képező sémával, a következőket teheti:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Ellenkező esetben meg kell adnia a céltárgy sémáját a to_avro
függvényben:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Hitelesítés külső Confluent-sémaregisztrációs adatbázisba
A Databricks Runtime 12.2 LTS-ben és újabb verziókban hitelesítést végezhet egy külső Confluent-sémaregisztrációs adatbázisban. Az alábbi példák bemutatják, hogyan konfigurálhatja a sémaregisztrációs beállításjegyzék beállításait hitelesítési hitelesítő adatok és API-kulcsok használatára.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Truststore- és keystore-fájlok használata Unity Catalog-kötetekben
A Databricks Runtime 14.3 LTS-ben és újabb verziókban a Unity Catalog-kötetekben található truststore- és keystore-fájlokat használhatja a Confluent-sémaregisztrációs adatbázisba való hitelesítéshez. Frissítse a konfigurációt az előző példában a következő szintaxissal:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Sémafejlődési mód használata from_avro
A Databricks Runtime 14.2-ben és újabb verziókban sémafejlődési from_avro
módot használhat. A sémafejlődési mód engedélyezése esetén a feladat a sémafejlődés észlelése után egy hibát jelez UnknownFieldException
. A Databricks azt javasolja, hogy sémafejlődési móddal konfigurálja a feladatokat, hogy automatikusan újrainduljanak a feladathibák. Lásd a strukturált streamelés éles szempontjait.
A séma fejlődése akkor hasznos, ha a forrásadatok sémája idővel fejlődni fog, és az adatforrás összes mezőjét betölti. Ha a lekérdezések már explicit módon megadják, hogy mely mezőket kell lekérdezni az adatforrásban, a rendszer figyelmen kívül hagyja a hozzáadott mezőket a séma alakulásától függetlenül.
A sémafejlődés engedélyezéséhez használja a avroSchemaEvolutionMode
lehetőséget. Az alábbi táblázat a sémafejlődési mód beállításait ismerteti:
Lehetőség | Működés |
---|---|
none |
Alapértelmezett érték. Figyelmen kívül hagyja a sémafejlődést, és a feladat folytatódik. |
restart |
A sémafejlődés észlelésekor egy hibát UnknownFieldException jelez. A feladat újraindítása szükséges. |
Feljegyzés
Ezt a konfigurációt módosíthatja a streamelési feladatok között, és újra felhasználhatja ugyanazt az ellenőrzőpontot. A sémafejlődés letiltása elvetett oszlopokat eredményezhet.
Az elemzési mód konfigurálása
Az elemzési módot úgy konfigurálhatja, hogy megállapítsa, hogy sikertelen vagy null rekordokat szeretne-e kibocsátani, ha a sémafejlődési mód le van tiltva, és a séma nem visszamenőlegesen kompatibilis módon fejlődik. Az alapértelmezett beállításokkal meghiúsul, from_avro
ha nem kompatibilis sémamódosításokat észlel.
Az elemzési mód megadásához használja a mode
beállítást. Az alábbi táblázat az elemzési mód beállítását ismerteti:
Lehetőség | Működés |
---|---|
FAILFAST |
Alapértelmezett érték. Egy elemzési hiba a következővel dob SparkException egy elemet errorClass MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
A rendszer figyelmen kívül hagy egy elemzési hibát, és null rekordot ad ki. |
Feljegyzés
Ha engedélyezve van a sémafejlődés, csak akkor ad kivételt, FAILFAST
ha egy rekord sérült.
Példa sémafejlődésre és elemzési mód beállítására
Az alábbi példa a sémafejlődés engedélyezését és az elemzési mód megadását mutatja be FAILFAST
a Confluent-sémaregisztrációs adatbázissal:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)