Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Apache Avro je běžně používaný systém serializace dat ve světě streamování. Typickým řešením je umístit data ve formátu Avro do Apache Kafka, metadata v registru schémat Confluent a pak spouštět dotazy s architekturou streamování, která se připojuje k kafka i registru schémat.
Azure Databricks podporuje from_avro a to_avrofunkce pro vytváření streamovacích kanálů s daty Avro v Kafka a metadatech ve Schema Registry. Funkce to_avro kóduje sloupec jako binární ve formátu Avro a from_avro dekóduje binární data Avro do sloupce. Obě funkce transformují jeden sloupec do jiného sloupce a vstupní a výstupní datový typ SQL může být složitý nebo primitivní typ.
Poznámka:
Funkce from_avro a to_avro:
- Jsou dostupné v Pythonu, Scala a Javě.
- Lze předat funkcím SQL v dávkových i streamovacích dotazech.
Viz také zdroj souboru Avro dat.
Příklad ručně zadaného schématu
Podobně jako from_json a to_json můžete použít from_avro s to_avro libovolným binárním sloupcem. Schéma Avro můžete zadat ručně, jak je znázorněno v následujícím příkladu:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
příklad jsonFormatSchema
Schéma můžete také zadat jako řetězec JSON. Pokud je to například /tmp/user.avsc :
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "favorite_color", "type": ["string", "null"] }
]
}
Můžete vytvořit řetězec JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Pak použijte schéma v from_avro:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Příklad s registrem schématu
Pokud má váš cluster službu Registr schématu, můžete s ním pracovat, from_avro abyste nemuseli schéma Avro zadávat ručně.
Následující příklad ukazuje čtení tématu Kafka "t", za předpokladu, že klíč a hodnota jsou již registrovány v registru schémat jako předměty "t-key" a "t-value" typů STRING a INT:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))
Výchozí to_avrovýstupní schéma Avro nemusí odpovídat schématu cílového subjektu ve službě Registr schématu z následujících důvodů:
- Mapování typu Spark SQL na schéma Avro není 1:1. Podívejte se na Podporované typy pro Spark SQL – převod Avro>.
- Pokud je převedené výstupní schéma Avro typu záznamu, název záznamu je
topLevelRecorda ve výchozím nastavení neexistuje žádný obor názvů.
Pokud výchozí výstupní schéma to_avro odpovídá schématu cílového předmětu, můžete udělat toto:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
V opačném případě je nutné zadat schéma cílového předmětu to_avro ve funkci:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Přihlaste se k externímu registru schémat Confluent
Ve službě Databricks Runtime 12.2 LTS a novějších můžete provést ověření v externím registru schématu Confluent. Následující příklady ukazují, jak nakonfigurovat možnosti registru schématu tak, aby zahrnovaly přihlašovací údaje ověřování a klíče rozhraní API.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Použijte důvěryhodnostní úložiště a klíčové úložiště ve svazcích katalogu Unity
Ve službě Databricks Runtime 14.3 LTS a vyšší můžete k ověření v registru schémat Confluent použít úložiště důvěryhodnosti a soubory úložiště klíčů ve svazcích katalogu Unity. Aktualizujte konfiguraci v předchozím příkladu pomocí následující syntaxe:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Použití režimu vývoje schématu s využitím from_avro
V Databricks Runtime 14.2 a novějších můžete použít režim vývoje schématu s from_avro. Povolení režimu vývoje schématu způsobí, že úloha vyvolá UnknownFieldException po zjištění vývoje schématu. Databricks doporučuje konfigurovat úlohy s režimem evoluce schématu, aby se automaticky restartovaly při jakémkoli selhání úlohy. Viz aspekty produkce strukturovaného streamování.
Vývoj schématu je užitečný, pokud očekáváte, že se schéma zdrojových dat v průběhu času vyvíjí a ingestuje všechna pole z vašeho zdroje dat. Pokud už dotazy explicitně určují, která pole se mají dotazovat ve zdroji dat, přidaná pole se ignorují bez ohledu na vývoj schématu.
avroSchemaEvolutionMode Tuto možnost použijte k povolení vývoje schématu. Následující tabulka popisuje možnosti režimu vývoje schématu:
| Možnost | Chování |
|---|---|
none |
Výchozí. Ignoruje vývoj schématu a úloha pokračuje. |
restart |
Vyvolá UnknownFieldException při zjišťování vývoje schématu. Vyžaduje restartování úlohy. |
Poznámka:
Tuto konfiguraci můžete změnit mezi úlohami streamování a znovu použít stejný kontrolní bod. Zakázání vývoje schématu může vést k vyřazení sloupců.
Konfigurace režimu analýzy
Režim analýzy můžete nakonfigurovat tak, aby určil, zda systém má selhat nebo emitovat záznamy null, pokud je režim vývoje schématu zakázaný a schéma se vyvíjí ne zpětně kompatibilním způsobem. Při výchozím nastavení from_avro dojde k selhání, když pozoruje nekompatibilní změny schématu.
mode Pomocí možnosti můžete určit režim analýzy. Následující tabulka popisuje možnost parsování režimu:
| Možnost | Chování |
|---|---|
FAILFAST |
Výchozí. Při analýze dojde k chybě s příponou SparkExceptionerrorClassMALFORMED_AVRO_MESSAGE. |
PERMISSIVE |
Chyba analýzy se ignoruje a vygeneruje se záznam null. |
Poznámka:
S aktivovaným vývojem schématu FAILFAST vyvolá výjimky pouze v případě poškození záznamu.
Příklad použití vývoje schématu a nastavení režimu analýzy
Následující příklad ukazuje povolení vývoje schématu a určení FAILFAST režimu parsování s registrem schémat Confluent:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)