Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Apache Avro to powszechnie używany system serializacji danych w świecie przesyłania strumieniowego. Typowym rozwiązaniem jest umieszczenie danych w formacie Avro na platformie Apache Kafka, metadanych w rejestrze schematów Confluent, a następnie uruchomienie zapytań za pomocą struktury przesyłania strumieniowego, która łączy się zarówno z platformą Kafka, jak i rejestrem schematów.
Usługa Azure Databricks obsługuje funkcje from_avro
i to_avro
w celu tworzenia potoków przesyłania strumieniowego przy użyciu danych Avro na platformie Kafka i metadanych w Rejestrze Schematów. Funkcja to_avro
koduje kolumnę jako binarną w formacie Avro i from_avro
dekoduje dane binarne Avro do kolumny. Obie funkcje przekształcają jedną kolumnę w inną kolumnę, a typ danych wejściowych/wyjściowych SQL może być typem złożonym lub typem pierwotnym.
Uwaga
Funkcje from_avro
i to_avro
:
- Są dostępne w językach Python, Scala i Java.
- Można przekazać do funkcji SQL zarówno w zapytaniach wsadowych, jak i przesyłanych strumieniowo.
Zobacz również źródło danych pliku Avro.
Przykład ręcznie określonego schematu
Podobnie jak from_json i to_json, można użyć from_avro
i to_avro
z dowolną kolumną binarną. Schemat Avro można określić ręcznie, jak w poniższym przykładzie:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Przykład jsonFormatSchema
Możesz również określić schemat jako ciąg JSON. Jeśli na przykład /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "favorite_color", "type": ["string", "null"] }
]
}
Możesz utworzyć ciąg JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Następnie użyj schematu w from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Przykład z rejestrem schematów
Jeśli Twój klaster ma usługę rejestru schematów, from_avro
może z nią współpracować, dzięki czemu nie trzeba ręcznie określać schematu Avro.
W poniższym przykładzie pokazano odczytywanie topicu Kafka "t", przy założeniu, że klucz i wartość są już zarejestrowane w Rejestrze Schematów jako tematy "t-key" i "t-value" typów STRING
i INT
.
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))
W przypadku to_avro
domyślny schemat wyjściowy Avro może nie być zgodny ze schematem podmiotu docelowego w usłudze Rejestru schematów z następujących powodów:
- Mapowanie z typu Spark SQL na schemat Avro nie jest jednoznaczne. Zobacz > Spark SQL — Avro.
- Jeśli przekonwertowany schemat danych wyjściowych Avro jest typu rekordu, nazwa rekordu jest
topLevelRecord
i domyślnie nie ma przestrzeni nazw.
Jeśli domyślny schemat wyjściowy to_avro
jest zgodny ze schematem podmiotu docelowego, możesz wykonać następujące czynności:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
W przeciwnym razie należy podać schemat podmiotu docelowego w funkcji to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Uwierzytelnij w zewnętrznym rejestrze schematów Confluent
W środowisku Databricks Runtime 12.2 LTS i nowszym można uwierzytelnić się w zewnętrznym rejestrze schematów Confluent. W poniższych przykładach pokazano, jak skonfigurować opcje rejestru schematów w celu uwzględnienia poświadczeń uwierzytelniania i kluczy interfejsu API.
Skala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Pyton
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Używanie plików magazynu zaufania i magazynu kluczy w woluminach Katalogu Unity
W Databricks Runtime 14.3 LTS i nowszych można używać plików truststore i keystore w woluminach Unity Catalog do uwierzytelniania w Rejestrze Schematów Confluent. Zaktualizuj konfigurację w poprzednim przykładzie przy użyciu następującej składni:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Korzystaj z trybu ewolucji schematu z from_avro
W środowisku Databricks Runtime 14.2 lub nowszym można użyć trybu ewolucji schematu z from_avro
. Włączenie trybu ewolucji schematu powoduje, że zadanie rzuca UnknownFieldException
po wykryciu ewolucji schematu. Usługa Databricks zaleca skonfigurowanie zadań w trybie ewolucji schematu w celu automatycznego ponownego uruchomienia w przypadku niepowodzenia zadania. Zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.
Ewolucja schematu jest przydatna, jeśli oczekujesz, że schemat danych źródłowych będzie ewoluować wraz z upływem czasu i pozyskać wszystkie pola ze źródła danych. Jeśli zapytania już jawnie określą pola do wykonywania zapytań w źródle danych, dodane pola są ignorowane niezależnie od ewolucji schematu.
Użyj opcji avroSchemaEvolutionMode
, aby włączyć ewolucję schematu. W poniższej tabeli opisano opcje trybu ewolucji schematu:
Opcja | Zachowanie |
---|---|
none |
Domyślne. Ignoruje ewolucję schematu, a zadanie jest kontynuowane. |
restart |
Wyrzuca błąd UnknownFieldException podczas wykrywania ewolucji schematu. Wymaga ponownego uruchomienia zadania. |
Uwaga
Tę konfigurację można zmienić między zadaniami przesyłania strumieniowego i ponownie użyć tego samego punktu kontrolnego. Wyłączenie ewolucji schematu może spowodować usunięcie kolumn.
Konfigurowanie trybu analizy
Możesz skonfigurować sposób parsowania, aby określić, czy chcesz zgłaszać błąd, czy emitować rekordy o wartości null, gdy tryb ewolucji schematu jest wyłączony, a schemat ewoluuje w sposób niekompatybilny wstecz. W przypadku ustawień domyślnych from_avro
kończy się niepowodzeniem, gdy napotyka niezgodne zmiany schematu.
mode
Użyj opcji , aby określić tryb analizy. W poniższej tabeli opisano opcję trybu analizy:
Opcja | Zachowanie |
---|---|
FAILFAST |
Domyślne. Błąd analizowania zgłasza SparkException błąd z wartością errorClass MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Błąd analizowania jest ignorowany i emitowany jest rekord o wartości null. |
Uwaga
Po włączeniu ewolucji schematu FAILFAST
zgłasza wyjątki tylko wtedy, gdy rekord jest uszkodzony.
Przykład użycia ewolucji schematu i ustawiania trybu analizy
W poniższym przykładzie pokazano włączanie ewolucji schematu i określanie trybu parsowania FAILFAST
z Rejestrem Schematów Confluent.
Skala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))
Pyton
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)