Menyambungkan ke Apache Kafka

Artikel ini menjelaskan cara menggunakan Apache Kafka sebagai sumber atau sink saat menjalankan beban kerja Streaming Terstruktur di Azure Databricks.

Untuk informasi selengkapnya tentang Kafka, lihat dokumentasi Apache Kafka.

Membaca data dari Kafka

Azure Databricks menyediakan kata kunci kafka sebagai format data untuk mengonfigurasi koneksi ke Kafka. Berikut ini adalah contoh untuk bacaan streaming:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks juga mendukung semantik baca batch, seperti yang ditunjukkan dalam contoh berikut:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

Untuk pemuatan batch bertahap, Databricks merekomendasikan penggunaan Kafka dengan Trigger.AvailableNow. Lihat AvailableNow: Pemrosesan batch bertahap.

Dalam Databricks Runtime 13.3 LTS ke atas, Azure Databricks juga menyediakan fungsi SQL untuk membaca data Kafka. Streaming dengan SQL hanya didukung di Lakeflow Spark Declarative Pipelines atau dengan tabel streaming di Databricks SQL. Lihat fungsi bernilai tabel read_kafka.

Mengonfigurasi pembaca Kafka Structured Streaming

Opsi berikut harus diatur untuk sumber Kafka untuk kueri batch dan streaming:

Option Nilai Deskripsi
kafka.bootstrap.servers Daftar host:port yang dipisahkan koma Server bootstrap untuk kluster Kafka

Selain itu, salah satu opsi berikut diperlukan untuk menentukan topik mana yang akan berlangganan:

Option Nilai Deskripsi
subscribe Daftar topik yang dipisahkan koma. Daftar topik untuk berlangganan.
subscribePattern Java string regex. Pola yang digunakan untuk berlangganan topik.
assign String JSON {"topicA":[0,1],"topic":[2,4]}. Tertentu topicPartitions untuk dikonsumsi.

Lihat halaman Opsi untuk daftar lengkap opsi yang tersedia.

Skema untuk catatan Kafka

Rekaman yang dikembalikan oleh pembaca Streaming Terstruktur Kafka akan memiliki skema berikut:

kolom Tipe
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

key dan value selalu dideserialisasi sebagai array byte dengan ByteArrayDeserializer. Gunakan operasi DataFrame (seperti cast("string") atau from_avro) untuk secara eksplisit mendeserialisasi kunci dan nilai.

Menulis data ke Kafka

Berikut ini adalah contoh untuk penulisan streaming ke Kafka:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks juga mendukung semantik tulis batch ke sink data Kafka, seperti yang ditunjukkan dalam contoh berikut:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Mengonfigurasi penulis data Streaming Terstruktur Kafka

Penting

Databricks Runtime 13.3 LTS ke atas mencakup versi pustaka kafka-clients yang lebih baru yang memungkinkan penulisan idempoten secara default. Jika sink Kafka menggunakan versi 2.8.0 atau di bawahnya dengan ACL yang dikonfigurasi, tetapi tanpa IDEMPOTENT_WRITE diaktifkan, penulisan gagal dengan pesan kesalahan org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Atasi kesalahan ini dengan meningkatkan ke Kafka versi 2.8.0 atau lebih tinggi, atau dengan mengatur .option(“kafka.enable.idempotence”, “false”) saat mengonfigurasi penulis Streaming Terstruktur Anda.

Berikut ini adalah opsi umum yang diatur saat menulis untuk Kafka:

Option Nilai Nilai standar Deskripsi
kafka.boostrap.servers Daftar <host:port> yang dipisahkan koma Tidak ada [Diperlukan] Konfigurasi bootstrap.servers Kafka.
topic STRING belum diatur [Opsional] Mengatur topik untuk semua baris yang akan ditulis. Opsi ini mengambil alih kolom topik apa pun yang ada dalam data.
includeHeaders BOOLEAN false [Opsional] Apakah akan menyertakan header Kafka dalam baris.

Lihat halaman Opsi untuk daftar lengkap opsi yang tersedia.

Skema untuk penulis Kafka

Saat menulis data ke Kafka, DataFrame yang disediakan dapat mencakup bidang berikut:

Nama kolom Diperlukan atau opsional Tipe
key fakultatif STRING atau BINARY
value required STRING atau BINARY
headers fakultatif ARRAY
topic opsional (diabaikan jika topic diatur sebagai opsi penulis) STRING
partition fakultatif INT

Autentikasi

Azure Databricks mendukung beberapa metode autentikasi untuk Kafka, termasuk kredensial layanan Unity Catalog, SASL/SSL, dan opsi khusus cloud untuk AWS MSK, Azure Event Hubs, dan Google Cloud Managed Kafka. Lihat Autentikasi.

Mengambil metrik Kafka

Anda dapat memantau seberapa jauh kueri streaming terlambat dari Kafka menggunakan metrik avgOffsetsBehindLatest, maxOffsetsBehindLatest, dan minOffsetsBehindLatest. Ini melaporkan lag rata-rata, maksimum, dan offset minimum di semua partisi topik berlangganan, relatif terhadap offset terbaru di Kafka. Lihat Membaca Metrik Secara Interaktif.

Nota

Dalam Databricks Runtime 17.1 dan versi di atasnya, offset terbaru pada Kafka diambil setelah setiap micro-batch selesai. Pada topik yang terus menerima data, metrik backlog dapat menunjukkan nilai non-nol yang kecil dan persisten. Ini adalah perilaku yang diharapkan dan tidak menunjukkan bahwa aliran tertinggal.

Dalam Databricks Runtime 17.0 dan versi sebelumnya, offset Kafka terbaru diambil saat mulai batch mikro. Metrik backlog dapat menghasilkan 0 saat query streaming secara konsisten mengonsumsi semua rekaman yang tersedia di awal mikro-batch.

Untuk memperkirakan berapa banyak data yang belum digunakan kueri, gunakan estimatedTotalBytesBehindLatest metrik. Metrik ini memperkirakan jumlah total byte yang tersisa di semua partisi berlangganan berdasarkan batch yang diproses dalam 300 detik terakhir. Anda dapat mengubah jendela waktu yang digunakan untuk perkiraan ini dengan mengatur bytesEstimateWindowLength opsi . Misalnya, untuk mengaturnya ke 10 menit:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Jika Anda menjalankan streaming di buku catatan, Anda bisa melihat metrik ini di bawah tab Data Mentah di dasbor progres kueri streaming.

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Lihat Monitoring kueri Streaming Terstruktur di Azure Databricks untuk informasi selengkapnya.

Contoh kode: Kafka ke Delta

Contoh berikut menunjukkan alur kerja lengkap untuk terus mengalirkan data dari Kafka ke tabel Delta. Pola ini sangat ideal untuk beban kerja pengambilan data dalam waktu hampir nyata.

Contoh ini menggunakan skema JSON tetap. Untuk format lain seperti Avro atau Protobuf, gunakan from_avro atau from_protobuf. Anda juga dapat berintegrasi dengan registri skema. Lihat Contoh dengan Registri Skema.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);