Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In dit artikel wordt beschreven hoe u Apache Kafka kunt gebruiken als bron of sink bij het uitvoeren van structured streaming-workloads in Azure Databricks.
Zie de Apache Kafka-documentatie voor meer informatie over Kafka.
Gegevens lezen uit Kafka
Azure Databricks biedt het kafka trefwoord als gegevensformaat om verbindingen met Kafka te configureren. Hier volgt een voorbeeld voor een streaming-leesbewerking:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
SQL
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Azure Databricks biedt ook ondersteuning voor semantiek voor batchleesbewerkingen, zoals wordt weergegeven in het volgende voorbeeld:
Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Scala
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Voor het laden van incrementele batches raadt Databricks het gebruik van Kafka aan met Trigger.AvailableNow. Zie AvailableNow: Incrementele batchverwerking.
In Databricks Runtime 13.3 LTS en hoger biedt Azure Databricks ook een SQL-functie voor het lezen van Kafka-gegevens. Streaming met SQL wordt alleen ondersteund in Lakeflow Spark-declaratieve pijplijnen of met streamingtabellen in Databricks SQL. Zie read_kafka tabelwaardefunctie.
Kafka Structured Streaming-lezer configureren
De volgende optie moet worden ingesteld voor de Kafka-bron voor zowel batch- als streamingquery's:
| Option | Waarde | Beschrijving |
|---|---|---|
kafka.bootstrap.servers |
Een door komma's gescheiden lijst met host:poort | De Bootstrap-servers van het Kafka-cluster |
Daarnaast is een van de volgende opties vereist om op te geven op welke onderwerpen u zich wilt abonneren:
| Option | Waarde | Beschrijving |
|---|---|---|
subscribe |
Een door komma's gescheiden lijst met onderwerpen. | De lijst met onderwerpen waarop u zich wilt abonneren. |
subscribePattern |
Java regex-tekenreeks. | Het patroon dat wordt gebruikt om u te abonneren op een of meer onderwerpen. |
assign |
JSON-tekenreeks {"topicA":[0,1],"topic":[2,4]}. |
Specifieke topicPartitions die moeten worden gebruikt. |
Zie de pagina Opties voor de volledige lijst met beschikbare opties.
Schema voor Kafka-records
De records die door de Kafka Structured Streaming-lezer worden geretourneerd, hebben het volgende schema:
| Kolom | Typ |
|---|---|
key |
binary |
value |
binary |
topic |
string |
partition |
int |
offset |
long |
timestamp |
long |
timestampType |
int |
De key en de value worden altijd gedeserialiseerd als bytematrices met de ByteArrayDeserializer. Gebruik DataFrame-bewerkingen (zoals cast("string") of from_avro) om de sleutels en waarden expliciet deserialiseren.
Gegevens schrijven naar Kafka
Hier volgt een voorbeeld van een streaming-schrijfbewerking naar Kafka:
Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Scala
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
Azure Databricks ondersteunt ook semantiek voor batch-schrijfbewerkingen naar Kafka-gegevenssinks, zoals wordt weergegeven in het volgende voorbeeld:
Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Scala
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
De kafka Structured Streaming Writer configureren
Belangrijk
Databricks Runtime 13.3 LTS en hoger bevat een nieuwere versie van de kafka-clients bibliotheek waarmee idempotente schrijfbewerkingen standaard worden ingeschakeld. Als een Kafka-sink versie 2.8.0 of lager gebruikt met ACL's die zijn geconfigureerd, maar zonder IDEMPOTENT_WRITE ingeschakeld, mislukt de schrijfbewerking met het foutbericht org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.
Los deze fout op door een upgrade uit te voeren naar Kafka versie 2.8.0 of hoger, of door de instelling .option(“kafka.enable.idempotence”, “false”) tijdens het configureren van uw Structured Streaming Writer.
Hier volgen algemene opties die zijn ingesteld tijdens het schrijven naar Kafka:
| Option | Waarde | Standaardwaarde | Beschrijving |
|---|---|---|---|
kafka.boostrap.servers |
Een door komma's gescheiden lijst met <host:port> |
geen | [Vereist] De Kafka-configuratie bootstrap.servers. |
topic |
STRING |
niet ingesteld | [Optioneel] Hiermee stelt u het onderwerp in voor alle rijen die moeten worden geschreven. Met deze optie worden alle onderwerpkolommen overschreven die in de gegevens aanwezig zijn. |
includeHeaders |
BOOLEAN |
false |
[Optioneel] Of u de Kafka-headers in de rij wilt opnemen. |
Zie de pagina Opties voor de volledige lijst met beschikbare opties.
Schema voor Kafka schrijver
Bij het schrijven van gegevens naar Kafka kan het opgegeven DataFrame de volgende velden bevatten:
| Kolomnaam | Verplicht of optioneel | Typ |
|---|---|---|
key |
optional |
STRING of BINARY |
value |
required |
STRING of BINARY |
headers |
optional | ARRAY |
topic |
optioneel (genegeerd wanneer topic is ingesteld als de schrijfoptie) |
STRING |
partition |
optional | INT |
Authenticatie
Azure Databricks ondersteunt meerdere verificatiemethoden voor Kafka, waaronder servicereferenties voor Unity Catalog, SASL/SSL en cloudspecifieke opties voor AWS MSK, Azure Event Hubs en Google Cloud Managed Kafka. Zie Verificatie.
Kafka-metrieken ophalen
U kunt controleren hoe ver een streamingquery achter Kafka loopt met behulp van de avgOffsetsBehindLatestmaxOffsetsBehindLatest, en minOffsetsBehindLatest metrische gegevens. Deze rapporten geven de gemiddelde, maximale en minimale offset-lag voor alle geabonneerde onderwerppartities weer, ten opzichte van de meest recente offsets in Kafka. Zie Interactief metrische gegevens lezen.
Als u wilt schatten hoeveel gegevens de query nog niet heeft verbruikt, gebruikt u de estimatedTotalBytesBehindLatest metrische waarde. Met deze metrische waarde wordt het totale aantal resterende bytes voor alle geabonneerde partities geschat op basis van de batches die in de afgelopen 300 seconden zijn verwerkt. U kunt het tijdvenster voor deze schatting wijzigen door de bytesEstimateWindowLength optie in te stellen. Als u deze bijvoorbeeld wilt instellen op 10 minuten:
Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Scala
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Als u de stream uitvoert in een notebook, ziet u deze metrische gegevens op het tabblad Onbewerkte gegevens in het voortgangsdashboard voor streamingquery's:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Zie Structured Streaming-queries monitoren in Azure Databricks voor meer details.
Codevoorbeeld: Kafka naar Delta
In het volgende voorbeeld ziet u een volledige werkstroom voor het continu streamen van gegevens van Kafka naar een Delta-tabel. Dit patroon is ideaal voor gegevensopnameworkloads in bijna realtime.
In dit voorbeeld wordt een vast JSON-schema gebruikt. Voor andere indelingen, zoals Avro of Protobuf, gebruikt from_avro of from_protobuf. U kunt ook integreren met een schemaregister. Zie voorbeeld met schemaregister.
Python
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
Scala
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
SQL
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);