Delen via


Lees- en schrijfbewerkingen voor het streamen van Avro-gegevens

Apache Avro is een veelgebruikt systeem voor gegevensserialisatie in de streamingwereld. Een typische oplossing is het plaatsen van gegevens in Avro-indeling in Apache Kafka, metagegevens in Confluent Schema Registry en vervolgens query's uitvoeren met een streamingframework dat verbinding maakt met zowel Kafka als Schema Registry.

Azure Databricks ondersteunt de en to_avro functies voor het from_avro bouwen van streamingpijplijnen met Avro-gegevens in Kafka en metagegevens in Schema Registry. De functie to_avro codeert een kolom als binair bestand in Avro-indeling en from_avro decodeert avro binaire gegevens in een kolom. Beide functies transformeren de ene kolom naar een andere kolom en het SQL-gegevenstype voor invoer/uitvoer kan een complex type of een primitief type zijn.

Notitie

De from_avro en to_avro functies:

  • Zijn beschikbaar in Python, Scala en Java.
  • Kan worden doorgegeven aan SQL-functies in zowel batch- als streamingquery's.

Zie ook de Avro-bestandsgegevensbron.

Handmatig opgegeven schemavoorbeeld

Net als from_json en to_json kunt u en to_avro met elke binaire kolom gebruikenfrom_avro. U kunt het Avro-schema handmatig opgeven, zoals in het volgende voorbeeld:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema-voorbeeld

U kunt ook een schema opgeven als een JSON-tekenreeks. Als dit bijvoorbeeld het volgende is:/tmp/user.avsc

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

U kunt een JSON-tekenreeks maken:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Gebruik vervolgens het schema in from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Voorbeeld met schemaregister

Als uw cluster een Schema Registry-service heeft, from_avro kunt u ermee werken, zodat u het Avro-schema niet handmatig hoeft op te geven.

In het volgende voorbeeld ziet u hoe u een Kafka-onderwerp 't' leest, ervan uitgaande dat de sleutel en waarde al zijn geregistreerd in het schemaregister als onderwerp 't-key' en 't-waarde' van typen STRING en INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Om to_avrode volgende redenen komt het standaarduitvoer-Avro-schema mogelijk niet overeen met het schema van het doelonderwerp in de Schema Registry-service:

  • De toewijzing van het Spark SQL-type aan het Avro-schema is niet een-op-een. Zie ondersteunde typen voor Spark SQL -> Avro-conversie.
  • Als het geconverteerde avro-uitvoerschema van het recordtype is, is topLevelRecord de recordnaam en is er standaard geen naamruimte.

Als het standaarduitvoerschema to_avro overeenkomt met het schema van het doelonderwerp, kunt u het volgende doen:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Anders moet u het schema van het doelonderwerp opgeven in de to_avro functie:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Verifiëren bij een extern Confluent-schemaregister

In Databricks Runtime 12.2 LTS en hoger kunt u zich verifiëren bij een extern Confluent-schemaregister. In de volgende voorbeelden ziet u hoe u de schemaregisteropties configureert om verificatiereferenties en API-sleutels op te nemen.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

TrustStore- en keystore-bestanden gebruiken in Unity Catalog-volumes

In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalog-volumes gebruiken om te verifiëren bij een Confluent-schemaregister. Werk de configuratie in het vorige voorbeeld bij met behulp van de volgende syntaxis:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

De evolutiemodus van het schema gebruiken met from_avro

In Databricks Runtime 14.2 en hoger kunt u de modus schemaontwikkeling gebruiken met from_avro. Als u de modus voor schemaontwikkeling inschakelt, wordt de taak geactiveerd UnknownFieldException nadat de ontwikkeling van het schema is gedetecteerd. Databricks raadt u aan taken te configureren met de modus schemaontwikkeling om automatisch opnieuw op te starten bij taakfouten. Zie Structured Streaming-taken configureren om streamingquery's opnieuw op te starten bij fouten.

Schemaontwikkeling is handig als u verwacht dat het schema van uw brongegevens zich in de loop van de tijd ontwikkelt en alle velden uit uw gegevensbron opneemt. Als uw query's al expliciet aangeven welke velden moeten worden opgevraagd in uw gegevensbron, worden toegevoegde velden genegeerd, ongeacht de ontwikkeling van het schema.

Gebruik de avroSchemaEvolutionMode optie om schemaontwikkeling in te schakelen. In de volgende tabel worden de opties voor de evolutiemodus van het schema beschreven:

Optie Gedrag
none Standaard. Negeert de ontwikkeling van het schema en de taak wordt voortgezet.
restart Genereert een UnknownFieldException bij het detecteren van de evolutie van het schema. Vereist opnieuw opstarten van een taak.

Notitie

U kunt deze configuratie tussen streamingtaken wijzigen en hetzelfde controlepunt opnieuw gebruiken. Het uitschakelen van schemaontwikkeling kan leiden tot verwijderde kolommen.

De parseringsmodus configureren

U kunt de parseringsmodus configureren om te bepalen of u null-records wilt mislukken of verzenden wanneer de modus voor schemaontwikkeling is uitgeschakeld en het schema zich op een niet-achterwaartse compatibele manier ontwikkelt. Met standaardinstellingen from_avro mislukt het wanneer er incompatibele schemawijzigingen worden waargenomen.

Gebruik de optie om de mode parsemodus op te geven. In de volgende tabel wordt de optie voor de parseringsmodus beschreven:

Optie Gedrag
FAILFAST Standaard. Een parseringsfout genereert een SparkException met een errorClass van MALFORMED_AVRO_MESSAGE.
PERMISSIVE Er wordt een parseringsfout genegeerd en er wordt een null-record verzonden.

Notitie

Als schemaontwikkeling is ingeschakeld, FAILFAST worden alleen uitzonderingen veroorzaakt als een record beschadigd is.

Voorbeeld van het ontwikkelen van schema's en het instellen van de parseringsmodus

In het volgende voorbeeld ziet u hoe u de ontwikkeling van schema's inschakelt en de parsemodus opgeeft FAILFAST met een Confluent Schema Registry:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)