Avro-adatok streamelésének olvasási és írási műveletei

Az Apache Avro egy gyakran használt adat szerializálási rendszer a streamelési világban. Egy tipikus megoldás az adatok Avro formátumban történő elhelyezése Apache Kafkában, metaadatok Confluent Sémaregisztrációs adatbázis, majd lekérdezések futtatása egy streamelési keretrendszerrel, amely a Kafkához és a Sémaregisztrációs adatbázishoz is csatlakozik.

Az Azure Databricks támogatja a from_avro, to_avroésfüggvényeket a streamelési csővezetékek létrehozásához Avro-adatokkal a Kafkában és metaadatokkal a Sémaregisztrációban. A függvény to_avro binárisként kódolja az oszlopokat Avro formátumban, és from_avro az Avro bináris adatait egy oszlopba dekódolja. Mindkét függvény átalakítja az egyik oszlopot egy másik oszlopba, a bemeneti/kimeneti SQL-adattípus pedig összetett vagy primitív típus lehet.

Megjegyzés

A from_avro és to_avro függvények:

  • A Pythonban, a Scalában és a Java-ban is elérhetők.
  • Tételes és streamelési lekérdezések során argumentumok adhatók át SQL-függvényeknek.

Lásd még az Avro-fájl adatforrását.

Példa manuálisan megadott sémára

Hasonlóan, mint from_json és to_json, a from_avro és to_avro bármely bináris oszloppal használható. Az Avro-sémát manuálisan is megadhatja, ahogyan az alábbi példában is látható:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema példa

A sémát JSON-sztringként is megadhatja. Ha például /tmp/user.avsc :

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    { "name": "name", "type": "string" },
    { "name": "favorite_color", "type": ["string", "null"] }
  ]
}

JSON-sztringet is létrehozhat:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Ezután a sémát a from_avro-ban használja.

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Példa sémaregisztrációs adatbázissal

Ha a fürt rendelkezik sémaregisztrációs szolgáltatással, a from_avro dolgozhat vele, így nem szükséges kézzel megadnia az Avro-sémát.

Az alábbi példa egy Kafka-témakör "t" olvasását mutatja, feltételezve, hogy a kulcs és az érték már regisztrálva van a Schema Registry-ben "t-key" és "t-value" tárgyakként, a STRING és INTtípusokkal.

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
    from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))

A to_avroesetében előfordulhat, hogy az alapértelmezett kimeneti Avro-séma nem felel meg a sémaregisztrációs szolgáltatás céltulajdonosának sémájának a következő okok miatt:

  • A Spark SQL-típus Avro sémává történő leképezése nem egy az egyhez történik. Lásd a Spark SQL –> Avro átalakítás támogatott típusait.
  • Ha a konvertált kimeneti Avro-séma rekordtípusú, a rekord neve topLevelRecord, és alapértelmezés szerint nincs névtér.

Ha a to_avro alapértelmezett kimeneti sémája megegyezik a céltárgy sémájának, a következőket teheti:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Ellenkező esetben meg kell adnia a céltárgy sémáját a to_avro függvényben:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Hitelesítés külső Confluent-sémaregisztrációs adatbázisba

A Databricks Runtime 12.2 LTS-ben és újabb verziókban hitelesítést végezhet egy külső Confluent-sémaregisztrációs adatbázisban. Az alábbi példák bemutatják, hogyan konfigurálhatja a sémaregisztrációs beállításjegyzék beállításait hitelesítési hitelesítő adatok és API-kulcsok használatára.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
    from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Truststore- és keystore-fájlok használata Unity Catalog-kötetekben

A Databricks Runtime 14.3 LTS-ben és újabb verziókban a Unity Catalog-kötetekben található truststore- és keystore-fájlokat használhatja a Confluent-sémaregisztrációs adatbázisba való hitelesítéshez. Frissítse a konfigurációt az előző példában a következő szintaxissal:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Sémafejlődési mód használata from_avro

A Databricks Runtime 14.2 és újabb verzióiban sémaváltozási módot használhat from_avro-val. A sémafejlődési mód engedélyezése azt eredményezi, hogy a feladat egy UnknownFieldException hibát dob, miután észleli a sémaváltozást. A Databricks azt javasolja, hogy sémafejlődési móddal konfigurálja a munkákat, hogy feladathibák esetén automatikusan újrainduljanak. Lásd a strukturált streamelés gyártási szempontjait.

A séma fejlődése akkor hasznos, ha a forrásadatok sémája idővel fejlődni fog, és az adatforrás összes mezőjét betölti. Ha a lekérdezések már explicit módon megadják, hogy mely mezőket kell lekérdezni az adatforrásban, a rendszer figyelmen kívül hagyja a hozzáadott mezőket a séma alakulásától függetlenül.

A sémafejlődés engedélyezéséhez használja a avroSchemaEvolutionMode lehetőséget. Az alábbi táblázat a sémafejlődési mód beállításait ismerteti:

Lehetőség Működés
none Alapértelmezett Figyelmen kívül hagyja a sémafejlődést, és a feladat folytatódik.
restart Egy UnknownFieldException hibát dob, amikor észleli a séma változását. A feladat újraindítása szükséges.

Megjegyzés

Ezt a konfigurációt módosíthatja a streamelési feladatok között, és újra felhasználhatja ugyanazt az ellenőrzőpontot. A sémafejlődés letiltása törölt oszlopokat eredményezhet.

Az elemzési mód konfigurálása

Az elemzési módot úgy konfigurálhatja, hogy megállapítsa, hogy sikertelen vagy null rekordokat szeretne-e kibocsátani, ha a sémafejlődési mód le van tiltva, és a séma nem visszamenőlegesen kompatibilis módon fejlődik. Az alapértelmezett beállításokkal from_avro meghiúsul, ha nem kompatibilis sémamódosításokat észlel.

Az elemzési mód megadásához használja a mode beállítást. Az alábbi táblázat az elemzési mód beállítását ismerteti:

Lehetőség Működés
FAILFAST Alapértelmezett. Egy elemzési hiba kivételt dob SparkException, amelynek errorClassMALFORMED_AVRO_MESSAGE értéke van.
PERMISSIVE A rendszer figyelmen kívül hagy egy elemzési hibát, és null rekordot ad ki.

Megjegyzés

Ha engedélyezve van a sémafejlődés, FAILFAST csak akkor ad kivételt, ha egy rekord sérült.

Példa sémafejlődésre és elemzési mód beállítására

Az alábbi példa a sémafejlődés engedélyezését és FAILFAST elemzési mód megadását mutatja be a Confluent-sémaregisztrációs adatbázissal:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            data = $"key",
            subject = "t-key",
            schemaRegistryAddress = schemaRegistryAddr,
            options = schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)