Delen via


Verbinding maken met Apache Kafka

In dit artikel wordt beschreven hoe u Apache Kafka kunt gebruiken als bron of sink bij het uitvoeren van structured streaming-workloads in Azure Databricks.

Zie de Apache Kafka-documentatie voor meer informatie over Kafka.

Gegevens lezen uit Kafka

Azure Databricks biedt het kafka trefwoord als gegevensformaat om verbindingen met Kafka te configureren. Hier volgt een voorbeeld voor een streaming-leesbewerking:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks biedt ook ondersteuning voor semantiek voor batchleesbewerkingen, zoals wordt weergegeven in het volgende voorbeeld:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

Voor het laden van incrementele batches raadt Databricks het gebruik van Kafka aan met Trigger.AvailableNow. Zie AvailableNow: Incrementele batchverwerking.

In Databricks Runtime 13.3 LTS en hoger biedt Azure Databricks ook een SQL-functie voor het lezen van Kafka-gegevens. Streaming met SQL wordt alleen ondersteund in Lakeflow Spark-declaratieve pijplijnen of met streamingtabellen in Databricks SQL. Zie read_kafka tabelwaardefunctie.

Kafka Structured Streaming-lezer configureren

De volgende optie moet worden ingesteld voor de Kafka-bron voor zowel batch- als streamingquery's:

Option Waarde Beschrijving
kafka.bootstrap.servers Een door komma's gescheiden lijst met host:poort De Bootstrap-servers van het Kafka-cluster

Daarnaast is een van de volgende opties vereist om op te geven op welke onderwerpen u zich wilt abonneren:

Option Waarde Beschrijving
subscribe Een door komma's gescheiden lijst met onderwerpen. De lijst met onderwerpen waarop u zich wilt abonneren.
subscribePattern Java regex-tekenreeks. Het patroon dat wordt gebruikt om u te abonneren op een of meer onderwerpen.
assign JSON-tekenreeks {"topicA":[0,1],"topic":[2,4]}. Specifieke topicPartitions die moeten worden gebruikt.

Zie de pagina Opties voor de volledige lijst met beschikbare opties.

Schema voor Kafka-records

De records die door de Kafka Structured Streaming-lezer worden geretourneerd, hebben het volgende schema:

Kolom Typ
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

De key en de value worden altijd gedeserialiseerd als bytematrices met de ByteArrayDeserializer. Gebruik DataFrame-bewerkingen (zoals cast("string") of from_avro) om de sleutels en waarden expliciet deserialiseren.

Gegevens schrijven naar Kafka

Hier volgt een voorbeeld van een streaming-schrijfbewerking naar Kafka:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks ondersteunt ook semantiek voor batch-schrijfbewerkingen naar Kafka-gegevenssinks, zoals wordt weergegeven in het volgende voorbeeld:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

De kafka Structured Streaming Writer configureren

Belangrijk

Databricks Runtime 13.3 LTS en hoger bevat een nieuwere versie van de kafka-clients bibliotheek waarmee idempotente schrijfbewerkingen standaard worden ingeschakeld. Als een Kafka-sink versie 2.8.0 of lager gebruikt met ACL's die zijn geconfigureerd, maar zonder IDEMPOTENT_WRITE ingeschakeld, mislukt de schrijfbewerking met het foutbericht org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Los deze fout op door een upgrade uit te voeren naar Kafka versie 2.8.0 of hoger, of door de instelling .option(“kafka.enable.idempotence”, “false”) tijdens het configureren van uw Structured Streaming Writer.

Hier volgen algemene opties die zijn ingesteld tijdens het schrijven naar Kafka:

Option Waarde Standaardwaarde Beschrijving
kafka.boostrap.servers Een door komma's gescheiden lijst met <host:port> geen [Vereist] De Kafka-configuratie bootstrap.servers.
topic STRING niet ingesteld [Optioneel] Hiermee stelt u het onderwerp in voor alle rijen die moeten worden geschreven. Met deze optie worden alle onderwerpkolommen overschreven die in de gegevens aanwezig zijn.
includeHeaders BOOLEAN false [Optioneel] Of u de Kafka-headers in de rij wilt opnemen.

Zie de pagina Opties voor de volledige lijst met beschikbare opties.

Schema voor Kafka schrijver

Bij het schrijven van gegevens naar Kafka kan het opgegeven DataFrame de volgende velden bevatten:

Kolomnaam Verplicht of optioneel Typ
key optional STRING of BINARY
value required STRING of BINARY
headers optional ARRAY
topic optioneel (genegeerd wanneer topic is ingesteld als de schrijfoptie) STRING
partition optional INT

Authenticatie

Azure Databricks ondersteunt meerdere verificatiemethoden voor Kafka, waaronder servicereferenties voor Unity Catalog, SASL/SSL en cloudspecifieke opties voor AWS MSK, Azure Event Hubs en Google Cloud Managed Kafka. Zie Verificatie.

Kafka-metrieken ophalen

U kunt controleren hoe ver een streamingquery achter Kafka loopt met behulp van de avgOffsetsBehindLatestmaxOffsetsBehindLatest, en minOffsetsBehindLatest metrische gegevens. Deze rapporten geven de gemiddelde, maximale en minimale offset-lag voor alle geabonneerde onderwerppartities weer, ten opzichte van de meest recente offsets in Kafka. Zie Interactief metrische gegevens lezen.

Als u wilt schatten hoeveel gegevens de query nog niet heeft verbruikt, gebruikt u de estimatedTotalBytesBehindLatest metrische waarde. Met deze metrische waarde wordt het totale aantal resterende bytes voor alle geabonneerde partities geschat op basis van de batches die in de afgelopen 300 seconden zijn verwerkt. U kunt het tijdvenster voor deze schatting wijzigen door de bytesEstimateWindowLength optie in te stellen. Als u deze bijvoorbeeld wilt instellen op 10 minuten:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Als u de stream uitvoert in een notebook, ziet u deze metrische gegevens op het tabblad Onbewerkte gegevens in het voortgangsdashboard voor streamingquery's:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Zie Structured Streaming-queries monitoren in Azure Databricks voor meer details.

Codevoorbeeld: Kafka naar Delta

In het volgende voorbeeld ziet u een volledige werkstroom voor het continu streamen van gegevens van Kafka naar een Delta-tabel. Dit patroon is ideaal voor gegevensopnameworkloads in bijna realtime.

In dit voorbeeld wordt een vast JSON-schema gebruikt. Voor andere indelingen, zoals Avro of Protobuf, gebruikt from_avro of from_protobuf. U kunt ook integreren met een schemaregister. Zie voorbeeld met schemaregister.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);