Čtení a zápis streamovaných dat Avro
Apache Avro je běžně používaný systém serializace dat ve světě streamování. Typickým řešením je umístit data ve formátu Avro do Apache Kafka, metadata v registru schémat Confluent a pak spouštět dotazy s architekturou streamování, která se připojuje k kafka i registru schémat.
Azure Databricks podporuje from_avro
a to_avro
funkce pro vytváření streamovacích kanálů s daty Avro v kafka a metadatech v registru schémat. Funkce to_avro
kóduje sloupec jako binární ve formátu Avro a from_avro
dekóduje binární data Avro do sloupce. Obě funkce transformují jeden sloupec do jiného sloupce a vstupní a výstupní datový typ SQL může být složitý nebo primitivní typ.
Poznámka:
Funkce from_avro
a to_avro
funkce:
- Jsou dostupné v Pythonu, Scala a Javě.
- Funkce SQL je možné předat v dávkových i streamovaných dotazech.
Viz také zdroj dat souboru Avro.
Příklad ručně zadaného schématu
Podobně jako from_json a to_json můžete použít a to_avro
s from_avro
libovolným binárním sloupcem. Schéma Avro můžete zadat ručně, jak je znázorněno v následujícím příkladu:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
příklad jsonFormatSchema
Schéma můžete také zadat jako řetězec JSON. Pokud je to například /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Můžete vytvořit řetězec JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Pak použijte schéma v from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Příklad s registrem schématu
Pokud má váš cluster službu Registr schématu, můžete s ním pracovat, from_avro
abyste nemuseli schéma Avro zadávat ručně.
Následující příklad ukazuje čtení tématu Kafka "t", za předpokladu, že klíč a hodnota jsou již registrovány v registru schémat jako předměty "t-key" a "t-value" typů STRING
a INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Výchozí to_avro
výstupní schéma Avro nemusí odpovídat schématu cílového subjektu ve službě Registr schématu z následujících důvodů:
- Mapování typu Spark SQL na schéma Avro není 1:1. Viz Podporované typy pro převod Spark SQL –> Avro.
- Pokud je převedené výstupní schéma Avro typu záznamu, název záznamu je
topLevelRecord
a ve výchozím nastavení neexistuje žádný obor názvů.
Pokud výchozí výstupní schéma to_avro
odpovídá schématu cílového předmětu, můžete udělat toto:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
V opačném případě je nutné zadat schéma cílového předmětu to_avro
ve funkci:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Ověření v externím registru schématu Confluent
Ve službě Databricks Runtime 12.2 LTS a novějších můžete provést ověření v externím registru schématu Confluent. Následující příklady ukazují, jak nakonfigurovat možnosti registru schématu tak, aby zahrnovaly přihlašovací údaje ověřování a klíče rozhraní API.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Použití úložiště důvěryhodnosti a souborů úložiště klíčů ve svazcích katalogu Unity
Ve službě Databricks Runtime 14.3 LTS a vyšší můžete k ověření v registru schémat Confluent použít úložiště důvěryhodnosti a soubory úložiště klíčů ve svazcích katalogu Unity. Aktualizujte konfiguraci v předchozím příkladu pomocí následující syntaxe:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Použití režimu vývoje schématu s využitím from_avro
V Databricks Runtime 14.2 a novějších můžete použít režim vývoje schématu s from_avro
. Povolení režimu vývoje schématu způsobí, že úloha vyvolá UnknownFieldException
po zjištění vývoje schématu. Databricks doporučuje konfigurovat úlohy s režimem vývoje schématu, aby se automaticky restartoval při selhání úlohy. Viz Konfigurace úloh strukturovaného streamování pro restartování dotazů streamování při selhání.
Vývoj schématu je užitečný, pokud očekáváte, že se schéma zdrojových dat v průběhu času vyvíjí a ingestuje všechna pole z vašeho zdroje dat. Pokud už dotazy explicitně určují, která pole se mají dotazovat ve zdroji dat, přidaná pole se ignorují bez ohledu na vývoj schématu.
avroSchemaEvolutionMode
Tuto možnost použijte k povolení vývoje schématu. Následující tabulka popisuje možnosti režimu vývoje schématu:
Možnost | Chování |
---|---|
none |
Výchozí. Ignoruje vývoj schématu a úloha pokračuje. |
restart |
UnknownFieldException Vyvolá při zjišťování vývoje schématu. Vyžaduje restartování úlohy. |
Poznámka:
Tuto konfiguraci můžete změnit mezi úlohami streamování a znovu použít stejný kontrolní bod. Zakázání vývoje schématu může vést k vyřazení sloupců.
Konfigurace režimu analýzy
Režim analýzy můžete nakonfigurovat tak, aby určil, jestli chcete selhat nebo generovat záznamy null, pokud je režim vývoje schématu zakázaný a schéma se vyvíjí zpětně kompatibilním způsobem. Při výchozím nastavení dojde k selhání při from_avro
sledování nekompatibilních změn schématu.
mode
Pomocí možnosti můžete určit režim analýzy. Následující tabulka popisuje možnost parsování režimu:
Možnost | Chování |
---|---|
FAILFAST |
Výchozí. Při analýze dojde k chybě s příponou SparkException errorClass MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Chyba analýzy se ignoruje a vygeneruje se záznam null. |
Poznámka:
S povoleným FAILFAST
vývojem schématu vyvolá výjimky pouze v případě poškození záznamu.
Příklad použití vývoje schématu a nastavení režimu analýzy
Následující příklad ukazuje povolení vývoje schématu a určení FAILFAST
režimu parsování s registrem schémat Confluent:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro