Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Artikel ini menjelaskan cara menggunakan Apache Kafka sebagai sumber atau sink saat menjalankan beban kerja Streaming Terstruktur di Azure Databricks.
Untuk informasi selengkapnya tentang Kafka, lihat dokumentasi Apache Kafka.
Membaca data dari Kafka
Azure Databricks menyediakan kata kunci kafka sebagai format data untuk mengonfigurasi koneksi ke Kafka. Berikut ini adalah contoh untuk bacaan streaming:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
SQL
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Azure Databricks juga mendukung semantik baca batch, seperti yang ditunjukkan dalam contoh berikut:
Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Scala
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Untuk pemuatan batch bertahap, Databricks merekomendasikan penggunaan Kafka dengan Trigger.AvailableNow. Lihat AvailableNow: Pemrosesan batch bertahap.
Dalam Databricks Runtime 13.3 LTS ke atas, Azure Databricks juga menyediakan fungsi SQL untuk membaca data Kafka. Streaming dengan SQL hanya didukung di Lakeflow Spark Declarative Pipelines atau dengan tabel streaming di Databricks SQL. Lihat fungsi bernilai tabel read_kafka.
Mengonfigurasi pembaca Kafka Structured Streaming
Opsi berikut harus diatur untuk sumber Kafka untuk kueri batch dan streaming:
| Option | Nilai | Deskripsi |
|---|---|---|
kafka.bootstrap.servers |
Daftar host:port yang dipisahkan koma | Server bootstrap untuk kluster Kafka |
Selain itu, salah satu opsi berikut diperlukan untuk menentukan topik mana yang akan berlangganan:
| Option | Nilai | Deskripsi |
|---|---|---|
subscribe |
Daftar topik yang dipisahkan koma. | Daftar topik untuk berlangganan. |
subscribePattern |
Java string regex. | Pola yang digunakan untuk berlangganan topik. |
assign |
String JSON {"topicA":[0,1],"topic":[2,4]}. |
Tertentu topicPartitions untuk dikonsumsi. |
Lihat halaman Opsi untuk daftar lengkap opsi yang tersedia.
Skema untuk catatan Kafka
Rekaman yang dikembalikan oleh pembaca Streaming Terstruktur Kafka akan memiliki skema berikut:
| kolom | Tipe |
|---|---|
key |
binary |
value |
binary |
topic |
string |
partition |
int |
offset |
long |
timestamp |
long |
timestampType |
int |
key dan value selalu dideserialisasi sebagai array byte dengan ByteArrayDeserializer. Gunakan operasi DataFrame (seperti cast("string") atau from_avro) untuk secara eksplisit mendeserialisasi kunci dan nilai.
Menulis data ke Kafka
Berikut ini adalah contoh untuk penulisan streaming ke Kafka:
Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Scala
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
Azure Databricks juga mendukung semantik tulis batch ke sink data Kafka, seperti yang ditunjukkan dalam contoh berikut:
Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Scala
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Mengonfigurasi penulis data Streaming Terstruktur Kafka
Penting
Databricks Runtime 13.3 LTS ke atas mencakup versi pustaka kafka-clients yang lebih baru yang memungkinkan penulisan idempoten secara default. Jika sink Kafka menggunakan versi 2.8.0 atau di bawahnya dengan ACL yang dikonfigurasi, tetapi tanpa IDEMPOTENT_WRITE diaktifkan, penulisan gagal dengan pesan kesalahan org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.
Atasi kesalahan ini dengan meningkatkan ke Kafka versi 2.8.0 atau lebih tinggi, atau dengan mengatur .option(“kafka.enable.idempotence”, “false”) saat mengonfigurasi penulis Streaming Terstruktur Anda.
Berikut ini adalah opsi umum yang diatur saat menulis untuk Kafka:
| Option | Nilai | Nilai standar | Deskripsi |
|---|---|---|---|
kafka.boostrap.servers |
Daftar <host:port> yang dipisahkan koma |
Tidak ada | [Diperlukan] Konfigurasi bootstrap.servers Kafka. |
topic |
STRING |
belum diatur | [Opsional] Mengatur topik untuk semua baris yang akan ditulis. Opsi ini mengambil alih kolom topik apa pun yang ada dalam data. |
includeHeaders |
BOOLEAN |
false |
[Opsional] Apakah akan menyertakan header Kafka dalam baris. |
Lihat halaman Opsi untuk daftar lengkap opsi yang tersedia.
Skema untuk penulis Kafka
Saat menulis data ke Kafka, DataFrame yang disediakan dapat mencakup bidang berikut:
| Nama kolom | Diperlukan atau opsional | Tipe |
|---|---|---|
key |
fakultatif |
STRING atau BINARY |
value |
required |
STRING atau BINARY |
headers |
fakultatif | ARRAY |
topic |
opsional (diabaikan jika topic diatur sebagai opsi penulis) |
STRING |
partition |
fakultatif | INT |
Autentikasi
Azure Databricks mendukung beberapa metode autentikasi untuk Kafka, termasuk kredensial layanan Unity Catalog, SASL/SSL, dan opsi khusus cloud untuk AWS MSK, Azure Event Hubs, dan Google Cloud Managed Kafka. Lihat Autentikasi.
Mengambil metrik Kafka
Anda dapat memantau seberapa jauh kueri streaming terlambat dari Kafka menggunakan metrik avgOffsetsBehindLatest, maxOffsetsBehindLatest, dan minOffsetsBehindLatest. Ini melaporkan lag rata-rata, maksimum, dan offset minimum di semua partisi topik berlangganan, relatif terhadap offset terbaru di Kafka. Lihat Membaca Metrik Secara Interaktif.
Nota
Dalam Databricks Runtime 17.1 dan versi di atasnya, offset terbaru pada Kafka diambil setelah setiap micro-batch selesai. Pada topik yang terus menerima data, metrik backlog dapat menunjukkan nilai non-nol yang kecil dan persisten. Ini adalah perilaku yang diharapkan dan tidak menunjukkan bahwa aliran tertinggal.
Dalam Databricks Runtime 17.0 dan versi sebelumnya, offset Kafka terbaru diambil saat mulai batch mikro. Metrik backlog dapat menghasilkan 0 saat query streaming secara konsisten mengonsumsi semua rekaman yang tersedia di awal mikro-batch.
Untuk memperkirakan berapa banyak data yang belum digunakan kueri, gunakan estimatedTotalBytesBehindLatest metrik. Metrik ini memperkirakan jumlah total byte yang tersisa di semua partisi berlangganan berdasarkan batch yang diproses dalam 300 detik terakhir. Anda dapat mengubah jendela waktu yang digunakan untuk perkiraan ini dengan mengatur bytesEstimateWindowLength opsi . Misalnya, untuk mengaturnya ke 10 menit:
Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Scala
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Jika Anda menjalankan streaming di buku catatan, Anda bisa melihat metrik ini di bawah tab Data Mentah di dasbor progres kueri streaming.
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Lihat Monitoring kueri Streaming Terstruktur di Azure Databricks untuk informasi selengkapnya.
Contoh kode: Kafka ke Delta
Contoh berikut menunjukkan alur kerja lengkap untuk terus mengalirkan data dari Kafka ke tabel Delta. Pola ini sangat ideal untuk beban kerja pengambilan data dalam waktu hampir nyata.
Contoh ini menggunakan skema JSON tetap. Untuk format lain seperti Avro atau Protobuf, gunakan from_avro atau from_protobuf. Anda juga dapat berintegrasi dengan registri skema. Lihat Contoh dengan Registri Skema.
Python
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
Scala
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
SQL
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);