Kimlik doğrulama

Azure Databricks Kafka bağlayıcısı, Kafka'ya bağlanmak için birden çok kimlik doğrulama yöntemini destekler. Bu makale, Databricks'te en yaygın kimlik doğrulama yöntemlerinden bazılarını kapsar. Desteklenen kimlik doğrulama yöntemlerinin tam listesi Kafka belgelerinde bulunabilir.

Hizmet sorumlusuyla Azure Event Hubs bağlan

Azure Databricks, Event Hubs hizmetleriyle Spark işlerinin kimlik doğrulamasını destekler. Bu kimlik doğrulaması, Microsoft Entra ID ile OAuth aracılığıyla yapılır.

AAD Kimlik Doğrulama diyagramı

Unity Kataloğu hizmeti kimlik bilgileriyle bağlanma

Databricks Runtime 16.1 sürümünden bu yana Azure Databricks, Azure Event Hubs kimlik doğrulaması için Unity Kataloğu hizmeti kimlik bilgilerini destekler. Databricks, özellikle paylaşılan kümelerde veya sunucusuz işlemde Kafka akışı çalıştırırken bu yaklaşımı önerir.

Kimlik doğrulaması için Unity Kataloğu hizmeti kimlik bilgilerini kullanmak için aşağıdaki adımları gerçekleştirin:

  • Yeni bir Unity Kataloğu hizmeti kimlik bilgisi oluşturun. Bu işlemi bilmiyorsanız, oluşturma yönergeleri için bkz. Hizmet kimlik bilgileri oluşturma .
    • Hizmet kimlik bilgilerinize bağlı erişim bağlayıcısının Azure Event Hubs bağlanmak için gerekli izinlere sahip olduğundan emin olun.
  • Kafka yapılandırmanızda kaynak olarak Unity Kataloğu hizmeti kimlik bilgilerinizin adını belirtin. seçeneğini databricks.serviceCredential hizmet kimlik bilgilerinizin adı olarak ayarlayın.

Aşağıdaki örnek, hizmet kimlik bilgilerini kullanarak Kafka'nın kaynak olarak yapılandırılmasını sağlar:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Not: Kafka'ya bağlanmak için Unity Kataloğu hizmeti kimlik bilgilerini kullandığınızda, artık aşağıdaki seçenekler gerekmez:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

İstemci kimliği ve gizli anahtar ile bağlanın

Azure Databricks, aşağıdaki işlem ortamlarında istemci kimliği ve gizli dizi ile Microsoft Entra ID kimlik doğrulamasını destekler:

  • Ayrılmış erişim modu (eski adıyla tek kullanıcı erişim modu) ile yapılandırılmış hesaplamada Databricks Runtime 12.2 LTS ve üstü.
  • Databricks Runtime 14.3 LTS ve üzeri, standart erişim modu (eski adıyla paylaşılan erişim modu) ile yapılandırılmış hesaplama platformu üzerinde.
  • Unity Kataloğu olmadan yapılandırılan Lakeflow Spark Deklaratif Boruhatları.

Azure Databricks, herhangi bir işlem ortamında veya Unity Kataloğu ile yapılandırılmış Lakeflow Spark Bildirimli İşlem Hatlarında bir sertifikayla Microsoft Entra ID kimlik doğrulamasını desteklemez.

Bu kimlik doğrulaması, standart erişim moduyla işlem üzerinde veya Unity Catalog Lakeflow Spark Bildirimli İşlem Hatlarında çalışmaz.

Microsoft Entra ID ile kimlik doğrulaması gerçekleştirmek için aşağıdaki değerlere sahip olmanız gerekir:

  • Kiracı kimliği. Bunu Microsoft Entra ID hizmetleri sekmesinde bulabilirsiniz.

  • Bir clientID (Uygulama Kimliği olarak da bilinir).

  • İstemci sırrı. Bunu aldıktan sonra Databricks Çalışma Alanınıza gizli olarak eklemeniz gerekir. Bu sırrı eklemek için bkz Sır yönetimi.

  • Bir EventHubs başlığı. Konu listesini, belirli bir Event Hubs Ad Alanı sayfasındaki Varlıklar bölümünün altındaki Event Hubs bölümünde bulabilirsiniz. Birden çok konu başlığıyla çalışmak için IAM rolünü Event Hubs düzeyinde ayarlayabilirsiniz.

  • EventHubs sunucusu. Bunu, belirli Event Hubs ad alanınızın genel bakış sayfasında bulabilirsiniz:

    Event Hubs ad alanı

Ayrıca, Entra ID kullanmak için Kafka'ya OAuth SASL mekanizmasını kullanmasını söylememiz gerekir (SASL genel bir protokoldür ve OAuth bir SASL "mekanizması" türüdür):

  • kafka.security.protocol Olmalıdır SASL_SSL
  • kafka.sasl.mechanism Olmalıdır OAUTHBEARER
  • kafka.sasl.login.callback.handler.class, gölgeli Kafka sınıfımızın oturum açma geri çağırma işleyicisine kafkashaded değeriyle Java sınıfının tam adı olmalıdır. Tam sınıf için aşağıdaki örne bakın.

Aşağıdaki örnekte Kafka, istemci kimliği ve gizli dizi ile Microsoft Entra ID kimlik doğrulaması kullanarak Azure Event Hubs bağlanacak şekilde yapılandırılır:

Python

# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Kimlik doğrulaması için SASL/PLAIN kullanma

SASL/PLAIN (kullanıcı adı ve parola) kimlik doğrulamasını kullanarak Kafka'ya bağlanmak için aşağıdaki seçenekleri yapılandırın. Gölgeli PlainLoginModule sınıf adını kullanın:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "PLAIN",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'PLAIN',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);

Azure Databricks parolanızı doğrudan kodunuzla birlikte saklamak yerine gizli dizi olarak depolamanızı önerir. Daha fazla bilgi için bkz Gizlilik yönetimi.

Kimlik doğrulaması için SASL/SCRAM kullanma

SASL/SCRAM (SCRAM-SHA-256 veya SCRAM-SHA-512) kullanarak Kafka'ya bağlanmak için aşağıdaki seçenekleri yapılandırın. Gölgeli ScramLoginModule sınıf adını kullanın:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "SCRAM-SHA-512",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "SCRAM-SHA-512",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'SCRAM-SHA-512',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);

Uyarı

Kafka kümeniz SCRAM-SHA-256 kullanacak şekilde yapılandırılmışsa, SCRAM-SHA-512 öğesini SCRAM-SHA-256 ile değiştirin.

Azure Databricks parolanızı doğrudan kodunuzla birlikte saklamak yerine gizli dizi olarak depolamanızı önerir. Daha fazla bilgi için bkz Gizlilik yönetimi.

Azure Databricks Kafka'ya bağlanmak için SSL kullanma

Kafka'ya SSL/TLS bağlantılarını etkinleştirmek için kafka.security.protocol'yi SSL olarak ayarlayın ve kafka. ile ön eklenmiş güven deposu ve anahtar deposu yapılandırma seçeneklerini sağlayın. Yalnızca sunucu kimlik doğrulaması (tek yönlü TLS) gerektiren SSL bağlantıları için bir güven deposuna ihtiyacınız vardır. Kafka aracısının istemcinin kimliğini de doğruladığı karşılıklı TLS (mTLS) için hem güven deposuna hem de anahtar deposuna ihtiyacınız vardır.

Aşağıdaki SSL/TLS seçenekleri kullanılabilir. SSL özelliklerinin tam listesi için Confluent belgelerindeki Apache Kafka SSL yapılandırma belgelerineve SSL ile Şifreleme ve Kimlik Doğrulaması'na bakın.

Seçenek Açıklama
kafka.security.protocol SSL TLS şifrelemesini etkinleştirmek için olarak ayarlayın.
kafka.ssl.truststore.location Güvenilen CA sertifikalarını içeren güven deposu dosyasının yolu.
kafka.ssl.truststore.password Güven deposu dosyasının parolası.
kafka.ssl.truststore.type Güven deposu dosya biçimi (varsayılan: JKS).
kafka.ssl.keystore.location İstemci sertifikasını ve özel anahtarı içeren anahtar deposu dosyasının yolu (mTLS için gereklidir).
kafka.ssl.keystore.password Anahtar deposu dosyasının parolası.
kafka.ssl.key.password Anahtar deposundaki özel anahtarın parolası.
kafka.ssl.endpoint.identification.algorithm Ana bilgisayar adı doğrulama algoritması. Varsayılan olarak https değerini alır. Devre dışı bırakmak için boş bir dizeye ayarlayın.

SSL kullanıyorsanız Databricks şunları yapmanızı önerir:

  • Sertifikalarınızı bir Unity Kataloğu biriminde depolayın. Birimden okuma erişimi olan kullanıcılar Kafka sertifikalarınızı kullanabilir. Daha fazla bilgi için Unity Catalog birimleri nedir? bölümüne bakın.
  • Sertifika parolalarınızı gizli alan kapsamında depolayın. Daha fazla bilgi için bkz. Gizli dizin kapsamlarını yönetme.

Aşağıdaki örnek, SSL bağlantısını etkinleştirmek için nesne depolama konumlarını ve Databricks gizli dizilerini kullanır:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

HDInsight üzerinde Kafka'yı Azure Databricks bağlama

  1. HDInsight Kafka kümesi oluşturun.

    Yönergeler için bkz. > aracılığıyla HDIns Azure Virtual Network

  2. Kafka aracılarını doğru adresi tanıtacak şekilde yapılandırın.

    Kafka'yı IP reklamları için yapılandırma başlığındaki yönergeleri izleyin. Kafka'yı Azure Virtual Machines kendiniz yönetiyorsanız, aracıların advertised.listeners yapılandırmasının konakların iç IP'sine ayarlandığından emin olun.

  3. bir Azure Databricks kümesi oluşturun.

  4. Kafka kümesini Azure Databricks kümesiyle eşleyin.

    Eş sanal ağlar başlığındaki yönergeleri izleyin.

Databricks gölgeli Kafka sınıf adlarını kullanma

Azure Databricks Kafka istemci kitaplıklarının özel, gölgeli sürümlerini paketler. Kimlik doğrulama yapılandırma seçeneklerinde başvurduğunuz tüm Kafka istemci sınıfı adları standart açık kaynak sınıf adı yerine gölgeli sınıf adı ön ekini kullanmalıdır. Bu, , kafka.sasl.jaas.configve kafka.sasl.login.callback.handler.classgibi kafka.sasl.client.callback.handler.classseçeneklerde başvuruda bulunan tüm sınıflar için geçerlidir.

Gölgesiz sınıf adlarının kullanılması hataya RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED neden olur. Diğer ayrıntılar için SSS bölümüne bakın.

Olası hataları işleme

  • Yeni bir KafkaAdminClient oluşturulamadı

    Aşağıdaki kimlik doğrulama seçeneklerinden herhangi biri yanlışsa bu iç Kafka hatası oluşur:

    • İstemci Kimliği (Uygulama Kimliği olarak da bilinir)
    • Kiracı Kimliği
    • Event Hubs sunucusu

    Hatayı çözmek için bu seçenekler için değerlerin doğru olduğunu doğrulayın. Ayrıca, örnekte varsayılan olarak sağlanan yapılandırma seçeneklerini değiştirirseniz (örneğin kafka.security.protocol) bu hatayı görebilirsiniz.

  • Kayıt bulunamadı

    DataFrame'inizi görüntülemeye veya işlemeye çalışıyor ancak sonuç almıyorsanız, kullanıcı arabiriminde aşağıdakileri görürsünüz.

    Sonuç iletisi yok

    Bu ileti, kimlik doğrulamasının başarılı olduğu ancak EventHubs'ın veri döndürmediğini gösterir. Bazı olası nedenler (hiçbir şekilde kapsamlı olmasa da) şunlardır:

    • Yanlış EventHubs konusunu belirttiniz.
    • startingOffsets için varsayılan Kafka yapılandırma seçeneği latest şeklindedir ve şu anda konudan herhangi bir veri almıyorsunuz. Verileri Kafka'nın en eski ofsetlerinden okumaya başlamak için startingOffsets öğesini earliest olarak ayarlayabilirsiniz.