Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Konektor Azure Databricks Kafka mendukung beberapa metode autentikasi untuk menyambungkan ke Kafka. Artikel ini membahas beberapa metode autentikasi yang paling umum di Databricks. Daftar lengkap metode autentikasi yang didukung dapat ditemukan dalam dokumentasi Kafka.
Hubungkan ke Azure Event Hubs dengan perwakilan layanan
Azure Databricks mendukung autentikasi pekerjaan Spark dengan layanan Azure Event Hubs. Autentikasi ini dilakukan melalui OAuth dengan Microsoft Entra ID.
Menyambungkan dengan kredensial layanan Unity Catalog
Sejak rilis Databricks Runtime 16.1, Azure Databricks mendukung kredensial layanan Katalog Unity untuk mengautentikasi ke Azure Event Hubs. Databricks merekomendasikan pendekatan ini, terutama saat menjalankan streaming Kafka pada kluster bersama atau komputasi tanpa server.
Untuk menggunakan kredensial layanan Katalog Unity untuk autentikasi, lakukan langkah-langkah berikut:
- Buat kredensial layanan Unity Catalog baru. Jika Anda tidak terbiasa dengan proses ini, lihat Membuat kredensial layanan untuk instruksi tentang membuatnya.
- Pastikan bahwa konektor akses yang dilampirkan ke kredensial layanan Anda memiliki izin yang diperlukan untuk menyambungkan ke Azure Event Hubs.
- Berikan nama kredensial layanan Katalog Unity Anda sebagai opsi sumber dalam konfigurasi Kafka Anda. Atur opsi
databricks.serviceCredentialke nama kredensial layanan Anda.
Contoh berikut mengonfigurasi Kafka sebagai sumber menggunakan kredensial layanan:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Catatan: Saat Anda menggunakan kredensial layanan Unity Catalog untuk menyambungkan ke Kafka, opsi berikut tidak lagi diperlukan:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
Menyambungkan dengan ID pelanggan dan kunci rahasia
Azure Databricks mendukung autentikasi Microsoft Entra ID dengan ID klien dan rahasia di lingkungan komputasi berikut:
- Databricks Runtime 12.2 LTS ke atas pada komputasi yang dikonfigurasi dengan mode akses khusus (sebelumnya mode akses pengguna tunggal).
- Databricks Runtime 14.3 LTS ke atas pada komputasi yang dikonfigurasi dengan mode akses standar (sebelumnya mode akses bersama).
- Alur Deklaratif Lakeflow Spark dikonfigurasi tanpa Katalog Unity.
Azure Databricks tidak mendukung autentikasi Microsoft Entra ID dengan sertifikat di lingkungan komputasi apa pun, atau di Alur Deklaratif Lakeflow Spark yang dikonfigurasi dengan Katalog Unity.
Autentikasi ini tidak berfungsi pada komputasi dengan mode akses standar atau pada Unity Catalog Lakeflow Spark Declarative Pipelines.
Untuk melakukan autentikasi dengan Microsoft Entra ID, Anda harus memiliki nilai berikut:
ID penyewa. Anda dapat menemukan ini di tab layanan Microsoft Entra ID.
clientID (juga dikenal sebagai ID Aplikasi).
Rahasia klien. Setelah Anda memiliki ini, Anda harus menambahkannya sebagai rahasia ke Ruang Kerja Databricks Anda. Untuk menambahkan rahasia ini, lihat Manajemen rahasia.
Sebuah topik EventHubs. Anda dapat menemukan daftar topik di bagian Event Hubs di bawah bagian Entitas di halaman Namespace Event Hubs tertentu. Untuk bekerja dengan beberapa topik, Anda dapat mengatur peran IAM di tingkat Event Hubs.
Server untuk EventHubs. Anda dapat menemukan ini di halaman gambaran umum namespace Event Hubs tertentu:
Selain itu, untuk menggunakan Entra ID, kita perlu memberi tahu Kafka untuk menggunakan mekanisme OAuth SASL (SASL adalah protokol generik, dan OAuth adalah jenis "mekanisme" SASL"):
-
kafka.security.protocolharusSASL_SSL -
kafka.sasl.mechanismharusOAUTHBEARER -
kafka.sasl.login.callback.handler.classharus menjadi nama terkualifikasi penuh dari kelas Java dengan nilaikafkashadedke handler panggilan balik login kelas Kafka yang telah diubah namespace-nya. Lihat contoh berikut untuk kelas persis.
Contoh berikut mengonfigurasi Kafka untuk menyambungkan ke Azure Event Hubs menggunakan autentikasi Microsoft Entra ID dengan ID klien dan rahasia:
Python
# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
Gunakan SASL/PLAIN untuk mengautentikasi
Untuk menyambungkan ke Kafka menggunakan autentikasi SASL/PLAIN (nama pengguna dan kata sandi), konfigurasikan opsi berikut. Gunakan nama kelas berbayang PlainLoginModule:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Azure Databricks merekomendasikan untuk menyimpan kata sandi Anda sebagai rahasia daripada menyertakannya langsung dalam kode Anda. Untuk informasi selengkapnya, lihat Manajemen rahasia.
Gunakan SASL/SCRAM untuk mengautentikasi
Untuk menyambungkan ke Kafka menggunakan SASL/SCRAM (SCRAM-SHA-256 atau SCRAM-SHA-512), konfigurasikan opsi berikut. Gunakan nama kelas berbayang ScramLoginModule:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Nota
Ganti SCRAM-SHA-512 dengan SCRAM-SHA-256 jika kluster Kafka Anda dikonfigurasi untuk menggunakan SCRAM-SHA-256.
Azure Databricks merekomendasikan untuk menyimpan kata sandi Anda sebagai rahasia daripada menyertakannya langsung dalam kode Anda. Untuk informasi selengkapnya, lihat Manajemen rahasia.
Gunakan SSL untuk menyambungkan Azure Databricks ke Kafka
Untuk mengaktifkan koneksi SSL/TLS ke Kafka, atur kafka.security.protocol ke SSL dan sediakan opsi konfigurasi penyimpanan kepercayaan dan penyimpanan kunci yang diawali dengan kafka.. Untuk koneksi SSL yang hanya memerlukan autentikasi server (TLS satu arah), Anda memerlukan penyimpanan kepercayaan. Untuk TLS bersama (mTLS) tempat broker Kafka juga mengautentikasi klien, Anda memerlukan penyimpanan kepercayaan dan penyimpanan kunci.
Opsi SSL/TLS berikut ini tersedia. Untuk daftar lengkap properti SSL, lihat dokumentasi konfigurasi Apache Kafka SSL dan Enkripsi dan Autentikasi dengan SSL dalam dokumentasi Confluent.
| Option | Deskripsi |
|---|---|
kafka.security.protocol |
Atur ke SSL untuk mengaktifkan enkripsi TLS. |
kafka.ssl.truststore.location |
Jalur ke file penyimpanan kepercayaan yang berisi sertifikat CA tepercaya. |
kafka.ssl.truststore.password |
Kata sandi untuk file penyimpanan kepercayaan. |
kafka.ssl.truststore.type |
Format file penyimpanan keamanan (default: JKS). |
kafka.ssl.keystore.location |
Jalur ke file penyimpanan kunci yang berisi sertifikat klien dan kunci privat (diperlukan untuk mTLS). |
kafka.ssl.keystore.password |
Kata sandi untuk file penyimpanan kunci. |
kafka.ssl.key.password |
Kata sandi untuk kunci privat di penyimpan kunci. |
kafka.ssl.endpoint.identification.algorithm |
Algoritma verifikasi nama host. Secara default menjadi https. Atur sebagai string kosong untuk menonaktifkan. |
Jika Anda menggunakan SSL, Databricks menyarankan Agar Anda:
- Simpan sertifikat Anda dalam volume Katalog Unity. Pengguna yang memiliki akses untuk membaca dari volume dapat menggunakan sertifikat Kafka Anda. Untuk informasi selengkapnya, lihat Apa itu volume Katalog Unity?.
- Simpan kata sandi sertifikat Anda sebagai rahasia dalam cakupan rahasia. Untuk informasi selengkapnya, lihat Mengelola cakupan rahasia.
Contoh berikut menggunakan lokasi penyimpanan objek dan rahasia Databricks untuk mengaktifkan koneksi SSL:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Sambungkan Kafka di HDInsight ke Azure Databricks
Buat kluster HDInsight Kafka.
Lihat Hubungkan ke Kafka di HDInsight melalui Azure Virtual Network untuk instruksi.
Konfigurasikan broker Kafka untuk menyatakan alamat yang benar.
Ikuti petunjuk di Konfigurasi Kafka untuk IP advertising. Jika Anda mengelola Kafka sendiri di Azure Virtual Machines, pastikan konfigurasi
advertised.listenersbroker diatur ke IP internal host.Buat kluster Azure Databricks.
Hubungkan kluster Kafka ke kluster Azure Databricks.
Ikuti petunjuk di Jaringan Virtual Peer.
Menggunakan nama kelas Kafka berbayang Databricks
Azure Databricks membundel versi berbayang milik pustaka klien Kafka. Semua nama kelas klien Kafka yang Anda referensikan dalam opsi konfigurasi autentikasi harus menggunakan awalan nama kelas berbayang alih-alih nama kelas sumber terbuka standar. Ini berlaku untuk kelas apa pun yang dirujuk dalam opsi seperti kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class, dan kafka.sasl.client.callback.handler.class.
Menggunakan nama kelas yang tidak dimodifikasi menghasilkan kesalahan RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED. Lihat FAQ untuk detail selengkapnya.
Menangani potensi kesalahan
Gagal membuat baru
KafkaAdminClientKesalahan internal Kafka ini muncul jika salah satu opsi autentikasi berikut salah:
- ID Klien (juga dikenal sebagai ID Aplikasi)
- ID Penyewa
- Server Azure Event Hubs
Untuk mengatasi kesalahan, verifikasi bahwa nilai sudah benar untuk opsi ini. Selain itu, Anda mungkin melihat kesalahan ini jika Mengubah opsi konfigurasi yang disediakan secara default dalam contoh (seperti
kafka.security.protocol).Tidak ada rekaman yang dikembalikan
Jika Anda mencoba menampilkan atau memproses DataFrame Tetapi tidak mendapatkan hasil, Anda akan melihat yang berikut ini di UI.
Pesan ini berarti bahwa autentikasi berhasil, tetapi EventHubs tidak mengembalikan data apa pun. Beberapa kemungkinan (meskipun tidak ada alasan lengkap) adalah:
- Anda menentukan topik EventHubs yang salah.
- Opsi konfigurasi Kafka default untuk
startingOffsetsadalahlatest, dan Saat ini Anda belum menerima data apa pun melalui topik tersebut. Anda dapat menetapkanstartingOffsetskeearliestuntuk mulai membaca data dari offset terawal milik Kafka.