Ansluta till Apache Kafka

På den här sidan beskrivs hur du kan använda Apache Kafka som källa eller mottagare när du kör strukturerade strömningsarbetsbelastningar på Azure Databricks.

Mer information om Kafka finns i Apache Kafka-dokumentationen.

Läs data från Kafka

Använd formatet kafka för att konfigurera anslutningar till Kafka. Följande är ett exempel på en direktuppspelningsläsning:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks stöder även batchläsningar från Kafka, som i följande exempel:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

För inkrementell batchbearbetning rekommenderar Databricks att använda Kafka med Trigger.AvailableNow. Se AvailableNow: Inkrementell batchbearbetning.

I Databricks Runtime 13.3 LTS och senare tillhandahåller Azure Databricks även en SQL-funktion för att läsa Kafka-data. Strömning med SQL stöds endast i deklarativa pipelines i Lakeflow Spark eller med strömmande tabeller i Databricks SQL. Se read_kafka tabellvärdesfunktion.

Konfigurera Kafka Structured Streaming-läsare

För både batch- och strömningsfrågor måste du ange bootstrap-servrarna för Kafka-källan med följande alternativ:

Key Värde Beskrivning
kafka.bootstrap.servers En kommaavgränsad lista över värd:port Kafka-klustrets bootstrap-servrar

Om du vill ange prenumerationsämnen måste du ange något av följande alternativ:

Option Värde Beskrivning
subscribe En kommaavgränsad lista med ämnen. Ämneslistan att prenumerera på.
subscribePattern Java regex-sträng. Det mönster som används för att prenumerera på ämnen.
assign JSON-sträng {"topicA":[0,1],"topic":[2,4]}. Specifikt topicPartitions för konsumtion.

Se sidan Alternativ för den fullständiga listan över tillgängliga alternativ.

Schema för rader i Kafka

Kafka Structured Streaming-läsaren returnerar rader med följande schema:

Kolumn Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

key och value deserialiseras alltid som byte-arrayer med ByteArrayDeserializer. Använd DataFrame-åtgärder (till exempel cast("string") eller from_avro) för att explicit deserialisera nycklar och värden.

Skriva data till Kafka

Följande är ett exempel på en strömmande skrivning till Kafka:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks stöder även batchskrivningssemantik till Kafka-datamottagare, som du ser i följande exempel:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Konfigurera Kafka Structured Streaming-skrivaren

Viktigt!

Databricks Runtime 13.3 LTS och senare innehåller en nyare version av kafka-clients biblioteket som aktiverar idempotent-skrivningar som standard. Om en Kafka-mottagare använder version 2.8.0 eller senare med ACL:er konfigurerade, men utan IDEMPOTENT_WRITE aktiverad, misslyckas skrivning med felmeddelandet org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Lös det här felet genom att uppgradera till Kafka version 2.8.0 eller senare, eller genom att ange .option(“kafka.enable.idempotence”, “false”) när du konfigurerar din structured streaming-skrivare.

Följande är vanliga alternativ för skrivningar till Kafka:

Key Värde Standardvärde Beskrivning
kafka.boostrap.servers En kommaavgränsad lista över <host:port> ingen Required. Kafka-konfigurationen bootstrap.servers.
topic STRING inte inställt Optional. Anger ämnet för alla rader som ska skrivas. Det här alternativet åsidosätter alla ämneskolumner som finns i data.
includeHeaders BOOLEAN false Optional. Om Kafka-rubrikerna ska tas med i raden.

Se sidan Alternativ för den fullständiga listan över tillgängliga alternativ.

Schema för Kafka-skrivare

När du skriver data till Kafka kan den angivna DataFrame innehålla följande fält:

Kolumnnamn Obligatoriskt eller valfritt Type
key optional STRING eller BINARY
value required STRING eller BINARY
headers optional ARRAY
topic valfritt (ignoreras om topic anges som skrivalternativ) STRING
partition optional INT

Autentisering

Azure Databricks stöder flera autentiseringsmetoder för Kafka, inklusive autentiseringsuppgifter för Unity Catalog-tjänsten, SASL/SSL och molnspecifika alternativ för AWS MSK, Azure Event Hubs och Google Cloud Managed Kafka. Se Autentisering.

Hämta Kafka-mått

Om du vill övervaka eftersläpning i förhållande till Kafka för en strömningsfråga använder du måtten avgOffsetsBehindLatest, maxOffsetsBehindLatest och minOffsetsBehindLatest. Dessa mätvärden visar den genomsnittliga, högsta och lägsta offsetfördröjningen för alla prenumererade ämnespartitioner, i förhållande till de senaste offsetvärdena i Kafka. Se Läsa mått interaktivt.

Anmärkning

I Databricks Runtime 17.1 och senare hämtas de senaste Kafka-förskjutningarna när varje mikrobatch har slutförts. I ämnen som kontinuerligt tar emot data kan kvarvarande mått visa små, beständiga värden som inte är noll. Detta är ett förväntat beteende och indikerar inte att strömmen ligger efter.

I Databricks Runtime 17.0 och senare hämtas de senaste Kafka-förskjutningarna vid starttid för mikrobatch. Kvarvarande mått kan returneras 0 när strömmande frågor konsekvent använder alla poster som är tillgängliga i början av mikrobatchen.

Använd måttet estimatedTotalBytesBehindLatest för att beräkna återstående data för en fråga att läsa. Det här måttet uppskattar det totala antalet byte som återstår för alla prenumerationspartitioner baserat på de batchar som bearbetats under de senaste 300 sekunderna. Du kan ändra tidsfönstret som används för den här uppskattningen genom att ange alternativet bytesEstimateWindowLength .

Om du till exempel vill ange fönsterlängden till 10 minuter:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Om du kör dataströmmen i en anteckningsbok kan du se dessa mått under fliken Rådata på instrumentpanelen för strömningsfrågans förlopp.

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Mer information finns i Övervaka strukturerade strömningsfrågor på Azure Databricks.

Exempel för Kafka till Delta Lake

I följande exempel visas ett fullständigt arbetsflöde för kontinuerlig direktuppspelning av data från Kafka till en Delta Lake-tabell. Du kan använda den här metoden för datainmatning i nära realtid.

I det här exemplet används ett fast JSON-schema. För andra format som Avro eller Protobuf använder du from_avro eller from_protobuf. Du kan också integrera med ett schemaregister. Se Exempel med schemaregister.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);