Bagikan melalui


Pemrosesan streaming dengan Apache Kafka dan Azure Databricks

Artikel ini menjelaskan cara menggunakan Apache Kafka sebagai sumber atau sink saat menjalankan beban kerja Streaming Terstruktur di Azure Databricks.

Untuk kafka lainnya, lihat dokumentasi Kafka.

Membaca data dari Kafka

Berikut ini adalah contoh untuk streaming yang dibaca dari Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks juga mendukung semantik baca batch untuk sumber data Kafka, seperti yang ditunjukkan dalam contoh berikut:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Untuk pemuatan batch bertahap, Databricks merekomendasikan penggunaan Kafka dengan Trigger.AvailableNow. Lihat Mengonfigurasi pemrosesan batch inkremental.

Di Databricks Runtime 13.3 LTS ke atas, Azure Databricks menyediakan fungsi SQL untuk membaca data Kafka. Streaming dengan SQL hanya didukung di Tabel Langsung Delta atau dengan tabel streaming di Databricks SQL. Lihat read_kafka fungsi bernilai tabel.

Mengonfigurasi pembaca Kafka Structured Streaming

Azure Databricks menyediakan kafka kata kunci sebagai format data untuk mengonfigurasi koneksi ke Kafka 0.10+.

Berikut ini adalah konfigurasi yang paling umum untuk Kafka:

Ada beberapa cara untuk menentukan topik mana yang akan dijadikan langganan. Anda hanya harus memberikan salah satu dari parameter ini:

Opsi Nilai Deskripsi
subscribe Daftar topik yang dipisahkan koma. Daftar topik untuk berlangganan.
subscribePattern String regex Java. Pola yang digunakan untuk berlangganan topik.
assign String JSON {"topicA":[0,1],"topic":[2,4]}. topicPartitions khusus untuk dibaca.

Konfigurasi penting lainnya:

Opsi Nilai Nilai Default Deskripsi
kafka.bootstrap.servers Daftar host:port yang dipisahkan koma. kosong [Diperlukan] Konfigurasi bootstrap.servers Kafka. Jika Anda tidak menemukan data dari Kafka, periksa daftar alamat broker terlebih dahulu. Jika daftar alamat broker salah, kesalahan mungkin tidak akan terjadi. Ini karena klien Kafka berasumsi bahwa broker pada akhirnya akan tersedia dan jika terjadi kesalahan jaringan, akan dicoba lagi sampai bisa.
failOnDataLoss true atau false. true [Opsional] Apakah kueri akan gagal jika ada kemungkinan data hilang. Kueri dapat secara permanen gagal membaca data dari Kafka karena banyak skenario seperti topik yang dihapus, pemotongan topik sebelum diproses, dan sebagainya. Kami mencoba memperkirakan secara sederhana apakah data kemungkinan hilang atau tidak. Terkadang hal ini dapat menyebabkan alarm palsu. Atur opsi ini ke false jika tidak berfungsi seperti yang diharapkan, atau Anda ingin kueri terus diproses meskipun data hilang.
minPartitions Integer >= 0, 0 = dinonaktifkan. 0 (dinonaktifkan) [Opsional] Jumlah minimum partisi untuk dibaca dari Kafka. Anda dapat mengonfigurasi Spark untuk menggunakan minimum partisi arbitrer untuk dibaca dari Kafka menggunakan minPartitions opsi . Biasanya Spark memiliki pemetaan 1-1 dari topicPartitions Kafka ke partisi Spark yang dipakai dari Kafka. Jika Anda atur opsi minPartitions ke nilai yang lebih besar dari topicPartitions Kafka Anda, Spark akan membagi partisi Kafka besar menjadi potongan yang lebih kecil. Opsi ini dapat diatur pada saat beban puncak, yaitu pada kemiringan data, dan karena stream Anda tertinggal untuk meningkatkan laju pemrosesan. Sebagai akibat, opsi ini akan menginisialisasi konsumen Kafka di setiap pemicu, yang dapat memengaruhi performa jika Anda menggunakan SSL saat terhubung ke Kafka.
kafka.group.id ID grup konsumen Kafka. tidak diatur [Opsional] ID grup untuk digunakan saat membaca dari Kafka. Gunakan ini dengan hati-hati. Secara default, setiap kueri menghasilkan ID grup yang unik untuk membaca data. Untuk memastikan bahwa setiap kueri memiliki grup konsumennya sendiri yang tidak menghadapi gangguan dari konsumen lain, dan karena itu dapat membaca semua partisi dari topik langganannya. Dalam beberapa skenario (misalnya, otorisasi berbasis grup Kafka), Anda mungkin ingin menggunakan ID grup resmi tertentu untuk membaca data. Anda dapat mengatur ID grup secara opsional. Namun, lakukan ini dengan sangat hati-hati karena dapat menyebabkan tindakan yang tidak terduga.

* Kueri yang berjalan secara bersamaan (baik, batch maupun streaming) dengan ID grup yang sama kemungkinan akan saling mengganggu dan menyebabkan setiap kueri hanya membaca sebagian data.
* Ini juga dapat terjadi ketika kueri dimulai / dimulai ulang secara berurutan. Untuk memperkecil masalah seperti itu, atur session.timeout.ms dari konfigurasi konsumen Kafka menjadi sangat kecil.
startingOffsets paling awal , terbaru terbaru [Opsional] Titik awal saat kueri dimulai, baik "paling awal" yang berasal dari offset paling awal, atau string json yang menentukan offset awal untuk setiap TopicPartition. Di json, -2 sebagai offset dapat digunakan untuk merujuk ke paling awal, -1 ke terbaru. Catatan: Untuk kueri batch, terbaru (baik secara implisit atau dengan menggunakan -1 di json) tidak diperbolehkan. Untuk kueri streaming, ini hanya berlaku saat kueri baru dimulai, dan melanjutkan akan selalu diambil dari tempat kueri ditinggalkan. Partisi yang baru ditemukan selama kueri akan dimulai paling awal.

Lihat Panduan Integrasi Kafka Structured Streaming untuk konfigurasi opsional lainnya.

Skema untuk rekaman Kafka

Skema rekaman Kafka adalah:

Column Tipe
kunci biner
value biner
topik string
partisi int
offset long
rentang waktu long
timestampType int

key dan value selalu diserialisasi sebagai array byte dengan ByteArrayDeserializer. Gunakan operasi DataFrame (seperti cast("string")) untuk secara eksplisit mendeserialisasi kunci dan nilai.

Menulis data ke Kafka

Berikut ini adalah contoh untuk penulisan streaming ke Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks juga mendukung semantik tulis batch ke sink data Kafka, seperti yang ditunjukkan dalam contoh berikut:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Mengonfigurasi penulis Streaming Terstruktur Kafka

Penting

Databricks Runtime 13.3 LTS ke atas mencakup versi kafka-clients pustaka yang lebih baru yang memungkinkan penulisan idempogen secara default. Jika sink Kafka menggunakan versi 2.8.0 atau di bawahnya dengan ACL yang dikonfigurasi, tetapi tanpa IDEMPOTENT_WRITE diaktifkan, penulisan gagal dengan pesan org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error statekesalahan .

Atasi kesalahan ini dengan meningkatkan ke Kafka versi 2.8.0 atau lebih tinggi, atau dengan mengatur .option(“kafka.enable.idempotence”, “false”) saat mengonfigurasi penulis Streaming Terstruktur Anda.

Skema yang disediakan untuk DataStreamWriter berinteraksi dengan sink Kafka. Anda bisa menggunakan bidang berikut:

Nama kolom Diperlukan atau opsional Jenis
key opsional STRING atau BINARY
value wajib diisi STRING atau BINARY
headers opsional ARRAY
topic opsional (diabaikan jika topic diatur sebagai opsi penulis) STRING
partition opsional INT

Berikut ini adalah opsi umum yang diatur saat menulis ke Kafka:

Opsi Nilai Nilai default Deskripsi
kafka.boostrap.servers Daftar yang dipisahkan koma dari <host:port> tidak ada [Diperlukan] Konfigurasi bootstrap.servers Kafka.
topic STRING tidak diatur [Opsional] Mengatur topik untuk semua baris yang akan ditulis. Opsi ini mengambil alih kolom topik apa pun yang ada dalam data.
includeHeaders BOOLEAN false [Opsional] Apakah akan menyertakan header Kafka dalam baris.

Lihat Panduan Integrasi Kafka Structured Streaming untuk konfigurasi opsional lainnya.

Mengambil metrik Kafka

Anda bisa mendapatkan rata-rata, min, dan maks dari jumlah offset yang kueri streaming-nya berada di belakang offset terbaru, tersedia di antara semua topik langganan dengan metrik avgOffsetsBehindLatest, maxOffsetsBehindLatest, dan minOffsetsBehindLatest. Lihat Membaca Metrik Secara Interaktif.

Catatan

Tersedia untuk Databricks Runtime 9.1 ke atas.

Dapatkan perkiraan jumlah total byte yang belum digunakan proses kueri dari topik berlangganan dengan memeriksa nilai estimatedTotalBytesBehindLatest. Perkiraan ini didasarkan pada batch yang diproses dalam 300 detik terakhir. Jangka waktu yang menjadi dasar dari perkiraan dapat diubah dengan menetapkan opsi bytesEstimateWindowLength ke nilai yang berbeda. Misalnya, untuk mengaturnya menjadi 10 menit:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Jika Anda menjalankan streaming di buku catatan, Anda bisa melihat metrik ini di bawah tab Data Mentah di dasbor kemajuan kueri streaming:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Menggunakan SSL untuk menyambungkan Azure Databricks ke Kafka

Untuk mengaktifkan koneksi SSL ke Kafka, ikuti petunjuk dalam Enkripsi dan Autentikasi dengan SSL di dokumentasi Confluent. Anda dapat memberikan konfigurasi yang dijelaskan di sana, diawali dengan kafka., sebagai opsi. Misalnya, Anda menentukan lokasi trust store di properti kafka.ssl.truststore.location.

Databricks merekomendasikan agar Anda:

Contoh berikut menggunakan lokasi penyimpanan objek dan rahasia Databricks untuk mengaktifkan koneksi SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Sambungkan Kafka pada HDInsight ke Azure Databricks

  1. Buat kluster HDInsight Kafka.

    Untuk petunjuk, lihat Sambungkan ke Kafka pada HDInsight melalui Azure Virtual Network.

  2. Konfigurasikan broker Kafka untuk menyatakan alamat yang benar.

    Ikuti petunjuk di Konfigurasi Kafka untuk IP advertising. Jika Anda mengelola Kafka sendiri di Azure Virtual Machines, pastikan konfigurasi advertised.listeners broker diatur ke IP internal host.

  3. Buat kluster Azure Databricks.

  4. Pasangkan kluster Kafka ke kluster Azure Databricks.

    Ikuti petunjuk di Pasangkan jaringan virtual.

Autentikasi Perwakilan Layanan dengan ID Microsoft Entra (sebelumnya Azure Active Directory) dan Azure Event Hubs

Azure Databricks mendukung autentikasi pekerjaan Spark dengan layanan Azure Event Hubs. Autentikasi ini dilakukan melalui OAuth dengan MICROSOFT Entra ID (sebelumnya Azure Active Directory).

Diagram Autentikasi AAD

Azure Databricks mendukung autentikasi ID Microsoft Entra dengan ID klien dan rahasia di lingkungan komputasi berikut:

  • Databricks Runtime 12.2 LTS ke atas pada komputasi yang dikonfigurasi dengan mode akses pengguna tunggal.
  • Databricks Runtime 14.3 LTS ke atas pada komputasi yang dikonfigurasi dengan mode akses bersama.
  • Alur Tabel Langsung Delta dikonfigurasi tanpa Katalog Unity.

Azure Databricks tidak mendukung autentikasi ID Microsoft Entra dengan sertifikat di lingkungan komputasi apa pun, atau di alur Tabel Langsung Delta yang dikonfigurasi dengan Katalog Unity.

Autentikasi ini tidak berfungsi pada kluster bersama atau pada Tabel Langsung Delta Katalog Unity.

Mengonfigurasi Koneksi or Kafka Streaming Terstruktur

Untuk melakukan autentikasi dengan ID Microsoft Entra, Anda memerlukan nilai berikut:

  • ID penyewa. Anda dapat menemukan ini di tab layanan ID Microsoft Entra.

  • clientID (juga dikenal sebagai ID Aplikasi).

  • Rahasia klien. Setelah Anda memiliki ini, Anda harus menambahkannya sebagai rahasia ke Ruang Kerja Databricks Anda. Untuk menambahkan rahasia ini, lihat Manajemen rahasia.

  • Topik EventHubs. Anda dapat menemukan daftar topik di bagian Azure Event Hubs di bawah bagian Entitas di halaman Namespace Layanan Azure Event Hubs tertentu. Untuk bekerja dengan beberapa topik, Anda dapat mengatur peran IAM di tingkat Azure Event Hubs.

  • Server EventHubs. Anda dapat menemukan ini di halaman gambaran umum namespace layanan Azure Event Hubs tertentu:

    Ruang nama Azure Event Hubs

Selain itu, untuk menggunakan Id Entra, kita perlu memberi tahu Kafka untuk menggunakan mekanisme OAuth SASL (SASL adalah protokol generik, dan OAuth adalah jenis SASL "mekanisme"):

  • kafka.security.protocol harus SASL_SSL
  • kafka.sasl.mechanism harus OAUTHBEARER
  • kafka.sasl.login.callback.handler.class harus menjadi nama kelas Java yang sepenuhnya memenuhi syarat dengan nilai kafkashaded untuk penangan panggilan balik masuk dari kelas Kafka berbayang kami. Lihat contoh berikut untuk kelas yang tepat.

Contoh

Selanjutnya, mari kita lihat contoh yang sedang berjalan:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Menangani potensi kesalahan

  • Opsi streaming tidak didukung.

    Jika Anda mencoba menggunakan mekanisme autentikasi ini dalam alur Tabel Langsung Delta yang dikonfigurasi dengan Katalog Unity, Anda mungkin menerima kesalahan berikut:

    Kesalahan streaming yang tidak didukung

    Untuk mengatasi kesalahan ini, gunakan konfigurasi komputasi yang didukung. Lihat Autentikasi Perwakilan Layanan dengan ID Microsoft Entra (sebelumnya Azure Active Directory) dan Azure Event Hubs.

  • Gagal membuat baru KafkaAdminClient.

    Ini adalah kesalahan internal yang dilemparkan Kafka jika salah satu opsi autentikasi berikut salah:

    • ID Klien (juga dikenal sebagai ID Aplikasi)
    • ID Penyewa
    • Server EventHubs

    Untuk mengatasi kesalahan, verifikasi bahwa nilai sudah benar untuk opsi ini.

    Selain itu, Anda mungkin melihat kesalahan ini jika Mengubah opsi konfigurasi yang disediakan secara default dalam contoh (bahwa Anda diminta untuk tidak memodifikasi), seperti kafka.security.protocol.

  • Tidak ada rekaman yang dikembalikan

    Jika Anda mencoba menampilkan atau memproses DataFrame Tetapi tidak mendapatkan hasil, Anda akan melihat yang berikut ini di UI.

    Tidak ada pesan hasil

    Pesan ini berarti bahwa autentikasi berhasil, tetapi EventHubs tidak mengembalikan data apa pun. Beberapa kemungkinan (meskipun tidak ada alasan lengkap) adalah:

    • Anda menentukan topik EventHubs yang salah.
    • Opsi konfigurasi Kafka default untuk startingOffsets adalah latest, dan Saat ini Anda belum menerima data apa pun melalui topik tersebut. Anda dapat mengatur startingOffsetstoearliest untuk mulai membaca data mulai dari offset paling awal Kafka.