Sdílet prostřednictvím


Čtení a zápis streamovaných dat Avro

Apache Avro je běžně používaný systém serializace dat ve světě streamování. Typickým řešením je umístit data ve formátu Avro do Apache Kafka, metadata v registru schémat Confluent a pak spouštět dotazy s architekturou streamování, která se připojuje k kafka i registru schémat.

Azure Databricks podporuje from_avro a to_avrofunkce pro vytváření streamovacích kanálů s daty Avro v kafka a metadatech v registru schémat. Funkce to_avro kóduje sloupec jako binární ve formátu Avro a from_avro dekóduje binární data Avro do sloupce. Obě funkce transformují jeden sloupec do jiného sloupce a vstupní a výstupní datový typ SQL může být složitý nebo primitivní typ.

Poznámka:

Funkce from_avro a to_avro funkce:

  • Jsou dostupné v Pythonu, Scala a Javě.
  • Funkce SQL je možné předat v dávkových i streamovaných dotazech.

Viz také zdroj dat souboru Avro.

Příklad ručně zadaného schématu

Podobně jako from_json a to_json můžete použít a to_avro s from_avro libovolným binárním sloupcem. Schéma Avro můžete zadat ručně, jak je znázorněno v následujícím příkladu:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

příklad jsonFormatSchema

Schéma můžete také zadat jako řetězec JSON. Pokud je to například /tmp/user.avsc :

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Můžete vytvořit řetězec JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Pak použijte schéma v from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Příklad s registrem schématu

Pokud má váš cluster službu Registr schématu, můžete s ním pracovat, from_avro abyste nemuseli schéma Avro zadávat ručně.

Následující příklad ukazuje čtení tématu Kafka "t", za předpokladu, že klíč a hodnota jsou již registrovány v registru schémat jako předměty "t-key" a "t-value" typů STRING a INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Výchozí to_avrovýstupní schéma Avro nemusí odpovídat schématu cílového subjektu ve službě Registr schématu z následujících důvodů:

  • Mapování typu Spark SQL na schéma Avro není 1:1. Viz Podporované typy pro převod Spark SQL –> Avro.
  • Pokud je převedené výstupní schéma Avro typu záznamu, název záznamu je topLevelRecord a ve výchozím nastavení neexistuje žádný obor názvů.

Pokud výchozí výstupní schéma to_avro odpovídá schématu cílového předmětu, můžete udělat toto:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

V opačném případě je nutné zadat schéma cílového předmětu to_avro ve funkci:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Ověření v externím registru schématu Confluent

Ve službě Databricks Runtime 12.2 LTS a novějších můžete provést ověření v externím registru schématu Confluent. Následující příklady ukazují, jak nakonfigurovat možnosti registru schématu tak, aby zahrnovaly přihlašovací údaje ověřování a klíče rozhraní API.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Použití úložiště důvěryhodnosti a souborů úložiště klíčů ve svazcích katalogu Unity

Ve službě Databricks Runtime 14.3 LTS a vyšší můžete k ověření v registru schémat Confluent použít úložiště důvěryhodnosti a soubory úložiště klíčů ve svazcích katalogu Unity. Aktualizujte konfiguraci v předchozím příkladu pomocí následující syntaxe:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Použití režimu vývoje schématu s využitím from_avro

V Databricks Runtime 14.2 a novějších můžete použít režim vývoje schématu s from_avro. Povolení režimu vývoje schématu způsobí, že úloha vyvolá UnknownFieldException po zjištění vývoje schématu. Databricks doporučuje konfigurovat úlohy s režimem vývoje schématu, aby se automaticky restartoval při selhání úlohy. Viz Konfigurace úloh strukturovaného streamování pro restartování dotazů streamování při selhání.

Vývoj schématu je užitečný, pokud očekáváte, že se schéma zdrojových dat v průběhu času vyvíjí a ingestuje všechna pole z vašeho zdroje dat. Pokud už dotazy explicitně určují, která pole se mají dotazovat ve zdroji dat, přidaná pole se ignorují bez ohledu na vývoj schématu.

avroSchemaEvolutionMode Tuto možnost použijte k povolení vývoje schématu. Následující tabulka popisuje možnosti režimu vývoje schématu:

Možnost Chování
none Výchozí. Ignoruje vývoj schématu a úloha pokračuje.
restart UnknownFieldException Vyvolá při zjišťování vývoje schématu. Vyžaduje restartování úlohy.

Poznámka:

Tuto konfiguraci můžete změnit mezi úlohami streamování a znovu použít stejný kontrolní bod. Zakázání vývoje schématu může vést k vyřazení sloupců.

Konfigurace režimu analýzy

Režim analýzy můžete nakonfigurovat tak, aby určil, jestli chcete selhat nebo generovat záznamy null, pokud je režim vývoje schématu zakázaný a schéma se vyvíjí zpětně kompatibilním způsobem. Při výchozím nastavení dojde k selhání při from_avro sledování nekompatibilních změn schématu.

mode Pomocí možnosti můžete určit režim analýzy. Následující tabulka popisuje možnost parsování režimu:

Možnost Chování
FAILFAST Výchozí. Při analýze dojde k chybě s příponou SparkExceptionerrorClassMALFORMED_AVRO_MESSAGE.
PERMISSIVE Chyba analýzy se ignoruje a vygeneruje se záznam null.

Poznámka:

S povoleným FAILFAST vývojem schématu vyvolá výjimky pouze v případě poškození záznamu.

Příklad použití vývoje schématu a nastavení režimu analýzy

Následující příklad ukazuje povolení vývoje schématu a určení FAILFAST režimu parsování s registrem schémat Confluent:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)