Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Azure Databricks Kafka bağlayıcısı, Kafka'ya bağlanmak için birden çok kimlik doğrulama yöntemini destekler. Bu makale, Databricks'te en yaygın kimlik doğrulama yöntemlerinden bazılarını kapsar. Desteklenen kimlik doğrulama yöntemlerinin tam listesi Kafka belgelerinde bulunabilir.
Hizmet sorumlusuyla Azure Event Hubs bağlan
Azure Databricks, Event Hubs hizmetleriyle Spark işlerinin kimlik doğrulamasını destekler. Bu kimlik doğrulaması, Microsoft Entra ID ile OAuth aracılığıyla yapılır.
Unity Kataloğu hizmeti kimlik bilgileriyle bağlanma
Databricks Runtime 16.1 sürümünden bu yana Azure Databricks, Azure Event Hubs kimlik doğrulaması için Unity Kataloğu hizmeti kimlik bilgilerini destekler. Databricks, özellikle paylaşılan kümelerde veya sunucusuz işlemde Kafka akışı çalıştırırken bu yaklaşımı önerir.
Kimlik doğrulaması için Unity Kataloğu hizmeti kimlik bilgilerini kullanmak için aşağıdaki adımları gerçekleştirin:
- Yeni bir Unity Kataloğu hizmeti kimlik bilgisi oluşturun. Bu işlemi bilmiyorsanız, oluşturma yönergeleri için bkz. Hizmet kimlik bilgileri oluşturma .
- Hizmet kimlik bilgilerinize bağlı erişim bağlayıcısının Azure Event Hubs bağlanmak için gerekli izinlere sahip olduğundan emin olun.
- Kafka yapılandırmanızda kaynak olarak Unity Kataloğu hizmeti kimlik bilgilerinizin adını belirtin. seçeneğini
databricks.serviceCredentialhizmet kimlik bilgilerinizin adı olarak ayarlayın.
Aşağıdaki örnek, hizmet kimlik bilgilerini kullanarak Kafka'nın kaynak olarak yapılandırılmasını sağlar:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Not: Kafka'ya bağlanmak için Unity Kataloğu hizmeti kimlik bilgilerini kullandığınızda, artık aşağıdaki seçenekler gerekmez:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
İstemci kimliği ve gizli anahtar ile bağlanın
Azure Databricks, aşağıdaki işlem ortamlarında istemci kimliği ve gizli dizi ile Microsoft Entra ID kimlik doğrulamasını destekler:
- Ayrılmış erişim modu (eski adıyla tek kullanıcı erişim modu) ile yapılandırılmış hesaplamada Databricks Runtime 12.2 LTS ve üstü.
- Databricks Runtime 14.3 LTS ve üzeri, standart erişim modu (eski adıyla paylaşılan erişim modu) ile yapılandırılmış hesaplama platformu üzerinde.
- Unity Kataloğu olmadan yapılandırılan Lakeflow Spark Deklaratif Boruhatları.
Azure Databricks, herhangi bir işlem ortamında veya Unity Kataloğu ile yapılandırılmış Lakeflow Spark Bildirimli İşlem Hatlarında bir sertifikayla Microsoft Entra ID kimlik doğrulamasını desteklemez.
Bu kimlik doğrulaması, standart erişim moduyla işlem üzerinde veya Unity Catalog Lakeflow Spark Bildirimli İşlem Hatlarında çalışmaz.
Microsoft Entra ID ile kimlik doğrulaması gerçekleştirmek için aşağıdaki değerlere sahip olmanız gerekir:
Kiracı kimliği. Bunu Microsoft Entra ID hizmetleri sekmesinde bulabilirsiniz.
Bir clientID (Uygulama Kimliği olarak da bilinir).
İstemci sırrı. Bunu aldıktan sonra Databricks Çalışma Alanınıza gizli olarak eklemeniz gerekir. Bu sırrı eklemek için bkz Sır yönetimi.
Bir EventHubs başlığı. Konu listesini, belirli bir Event Hubs Ad Alanı sayfasındaki Varlıklar bölümünün altındaki Event Hubs bölümünde bulabilirsiniz. Birden çok konu başlığıyla çalışmak için IAM rolünü Event Hubs düzeyinde ayarlayabilirsiniz.
EventHubs sunucusu. Bunu, belirli Event Hubs ad alanınızın genel bakış sayfasında bulabilirsiniz:
Ayrıca, Entra ID kullanmak için Kafka'ya OAuth SASL mekanizmasını kullanmasını söylememiz gerekir (SASL genel bir protokoldür ve OAuth bir SASL "mekanizması" türüdür):
-
kafka.security.protocolOlmalıdırSASL_SSL -
kafka.sasl.mechanismOlmalıdırOAUTHBEARER -
kafka.sasl.login.callback.handler.class, gölgeli Kafka sınıfımızın oturum açma geri çağırma işleyicisinekafkashadeddeğeriyle Java sınıfının tam adı olmalıdır. Tam sınıf için aşağıdaki örne bakın.
Aşağıdaki örnekte Kafka, istemci kimliği ve gizli dizi ile Microsoft Entra ID kimlik doğrulaması kullanarak Azure Event Hubs bağlanacak şekilde yapılandırılır:
Python
# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
Kimlik doğrulaması için SASL/PLAIN kullanma
SASL/PLAIN (kullanıcı adı ve parola) kimlik doğrulamasını kullanarak Kafka'ya bağlanmak için aşağıdaki seçenekleri yapılandırın. Gölgeli PlainLoginModule sınıf adını kullanın:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Azure Databricks parolanızı doğrudan kodunuzla birlikte saklamak yerine gizli dizi olarak depolamanızı önerir. Daha fazla bilgi için bkz Gizlilik yönetimi.
Kimlik doğrulaması için SASL/SCRAM kullanma
SASL/SCRAM (SCRAM-SHA-256 veya SCRAM-SHA-512) kullanarak Kafka'ya bağlanmak için aşağıdaki seçenekleri yapılandırın. Gölgeli ScramLoginModule sınıf adını kullanın:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Uyarı
Kafka kümeniz SCRAM-SHA-256 kullanacak şekilde yapılandırılmışsa, SCRAM-SHA-512 öğesini SCRAM-SHA-256 ile değiştirin.
Azure Databricks parolanızı doğrudan kodunuzla birlikte saklamak yerine gizli dizi olarak depolamanızı önerir. Daha fazla bilgi için bkz Gizlilik yönetimi.
Azure Databricks Kafka'ya bağlanmak için SSL kullanma
Kafka'ya SSL/TLS bağlantılarını etkinleştirmek için kafka.security.protocol'yi SSL olarak ayarlayın ve kafka. ile ön eklenmiş güven deposu ve anahtar deposu yapılandırma seçeneklerini sağlayın. Yalnızca sunucu kimlik doğrulaması (tek yönlü TLS) gerektiren SSL bağlantıları için bir güven deposuna ihtiyacınız vardır. Kafka aracısının istemcinin kimliğini de doğruladığı karşılıklı TLS (mTLS) için hem güven deposuna hem de anahtar deposuna ihtiyacınız vardır.
Aşağıdaki SSL/TLS seçenekleri kullanılabilir. SSL özelliklerinin tam listesi için Confluent belgelerindeki Apache Kafka SSL yapılandırma belgelerineve SSL ile Şifreleme ve Kimlik Doğrulaması'na bakın.
| Seçenek | Açıklama |
|---|---|
kafka.security.protocol |
SSL TLS şifrelemesini etkinleştirmek için olarak ayarlayın. |
kafka.ssl.truststore.location |
Güvenilen CA sertifikalarını içeren güven deposu dosyasının yolu. |
kafka.ssl.truststore.password |
Güven deposu dosyasının parolası. |
kafka.ssl.truststore.type |
Güven deposu dosya biçimi (varsayılan: JKS). |
kafka.ssl.keystore.location |
İstemci sertifikasını ve özel anahtarı içeren anahtar deposu dosyasının yolu (mTLS için gereklidir). |
kafka.ssl.keystore.password |
Anahtar deposu dosyasının parolası. |
kafka.ssl.key.password |
Anahtar deposundaki özel anahtarın parolası. |
kafka.ssl.endpoint.identification.algorithm |
Ana bilgisayar adı doğrulama algoritması. Varsayılan olarak https değerini alır. Devre dışı bırakmak için boş bir dizeye ayarlayın. |
SSL kullanıyorsanız Databricks şunları yapmanızı önerir:
- Sertifikalarınızı bir Unity Kataloğu biriminde depolayın. Birimden okuma erişimi olan kullanıcılar Kafka sertifikalarınızı kullanabilir. Daha fazla bilgi için Unity Catalog birimleri nedir? bölümüne bakın.
- Sertifika parolalarınızı gizli alan kapsamında depolayın. Daha fazla bilgi için bkz. Gizli dizin kapsamlarını yönetme.
Aşağıdaki örnek, SSL bağlantısını etkinleştirmek için nesne depolama konumlarını ve Databricks gizli dizilerini kullanır:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
HDInsight üzerinde Kafka'yı Azure Databricks bağlama
HDInsight Kafka kümesi oluşturun.
Yönergeler için bkz.
> aracılığıyla HDIns Azure Virtual Network Kafka aracılarını doğru adresi tanıtacak şekilde yapılandırın.
Kafka'yı IP reklamları için yapılandırma başlığındaki yönergeleri izleyin. Kafka'yı Azure Virtual Machines kendiniz yönetiyorsanız, aracıların
advertised.listenersyapılandırmasının konakların iç IP'sine ayarlandığından emin olun.bir Azure Databricks kümesi oluşturun.
Kafka kümesini Azure Databricks kümesiyle eşleyin.
Eş sanal ağlar başlığındaki yönergeleri izleyin.
Databricks gölgeli Kafka sınıf adlarını kullanma
Azure Databricks Kafka istemci kitaplıklarının özel, gölgeli sürümlerini paketler. Kimlik doğrulama yapılandırma seçeneklerinde başvurduğunuz tüm Kafka istemci sınıfı adları standart açık kaynak sınıf adı yerine gölgeli sınıf adı ön ekini kullanmalıdır. Bu, , kafka.sasl.jaas.configve kafka.sasl.login.callback.handler.classgibi kafka.sasl.client.callback.handler.classseçeneklerde başvuruda bulunan tüm sınıflar için geçerlidir.
Gölgesiz sınıf adlarının kullanılması hataya RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED neden olur. Diğer ayrıntılar için SSS bölümüne bakın.
Olası hataları işleme
Yeni bir
KafkaAdminClientoluşturulamadıAşağıdaki kimlik doğrulama seçeneklerinden herhangi biri yanlışsa bu iç Kafka hatası oluşur:
- İstemci Kimliği (Uygulama Kimliği olarak da bilinir)
- Kiracı Kimliği
- Event Hubs sunucusu
Hatayı çözmek için bu seçenekler için değerlerin doğru olduğunu doğrulayın. Ayrıca, örnekte varsayılan olarak sağlanan yapılandırma seçeneklerini değiştirirseniz (örneğin
kafka.security.protocol) bu hatayı görebilirsiniz.Kayıt bulunamadı
DataFrame'inizi görüntülemeye veya işlemeye çalışıyor ancak sonuç almıyorsanız, kullanıcı arabiriminde aşağıdakileri görürsünüz.
Bu ileti, kimlik doğrulamasının başarılı olduğu ancak EventHubs'ın veri döndürmediğini gösterir. Bazı olası nedenler (hiçbir şekilde kapsamlı olmasa da) şunlardır:
- Yanlış EventHubs konusunu belirttiniz.
-
startingOffsetsiçin varsayılan Kafka yapılandırma seçeneğilatestşeklindedir ve şu anda konudan herhangi bir veri almıyorsunuz. Verileri Kafka'nın en eski ofsetlerinden okumaya başlamak içinstartingOffsetsöğesiniearliestolarak ayarlayabilirsiniz.