Sdílet prostřednictvím


Autentizace

Konektor Azure Databricks Kafka podporuje více metod ověřování pro připojení k Kafka. Tento článek se zabývá některými nejběžnějšími metodami ověřování v Databricks. Úplný seznam podporovaných metod ověřování najdete v dokumentaci kafka.

Ověřování principálu služby pomocí ID Microsoft Entra a Azure Event Hubs

Azure Databricks podporuje ověřování úloh Sparku pomocí služeb Event Hubs. Toto ověřování se provádí prostřednictvím OAuth s ID Microsoft Entra.

Diagram ověřování AAD

Připojení pomocí přihlašovacích údajů služby Katalogu Unity

Od vydání verze Databricks Runtime 16.1 Azure Databricks podporuje přihlašovací údaje služby Unity Catalog pro ověřování přístupu k AWS Managed Streaming pro Azure Event Hubs. Databricks doporučuje tento přístup, zejména při spouštění streamování Kafka na sdílených clusterech nebo bezserverových výpočetních prostředcích.

Pokud chcete pro ověřování použít přihlašovací údaje služby Katalogu Unity, proveďte následující kroky:

  • Vytvořte nové přihlašovací údaje služby Katalogu Unity. Pokud tento proces neznáte, přečtěte si téma Vytvoření přihlašovacích údajů služby a pokyny k jeho vytvoření.
    • Ujistěte se, že přístupový konektor připojený k přihlašovacím údajům služby má potřebná oprávnění pro připojení ke službě Azure Event Hubs.
  • Jako zdrojovou možnost v konfiguraci Kafka zadejte název přihlašovacích údajů služby Katalogu Unity. Nastavte možnost databricks.serviceCredential na název přihlašovacích údajů vaší služby.

Následující příklad nakonfiguruje Kafka jako zdroj pomocí přihlašovacích údajů služby:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Poznámka: Při použití přihlašovacích údajů služby Katalogu Unity pro připojení k Systému Kafka už nejsou potřeba následující možnosti:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Připojení s ID klienta a tajným kódem

Azure Databricks podporuje ověřování Microsoft Entra ID s ID klienta a tajným kódem v následujících výpočetních prostředích:

  • Databricks Runtime 12.2 LTS a vyšší na výpočetních prostředcích nakonfigurovaných s vyhrazeným režimem přístupu (dříve režim přístupu jednoho uživatele).
  • Databricks Runtime 14.3 LTS a vyšší na výpočetních prostředcích nakonfigurovaných pomocí standardního režimu přístupu (dříve režimu sdíleného přístupu).
  • Deklarativní kanály Sparku Lakeflow nakonfigurované bez katalogu Unity

Azure Databricks nepodporuje ověřování MICROSOFT Entra ID pomocí certifikátu v žádném výpočetním prostředí ani v deklarativních kanálech Sparku Lakeflow nakonfigurovaných pomocí katalogu Unity.

Toto ověřování nefunguje na výpočetních prostředcích se standardním režimem přístupu ani v deklarativních kanálech Sparku v Unity Catalog Lakeflow.

Pokud chcete provést ověřování s ID Microsoft Entra, musíte mít následující hodnoty:

  • Identifikátor nájemce. Najdete to na kartě služeb Microsoft Entra ID.

  • ID klienta (označované také jako ID aplikace).

  • Tajemství klienta. Jakmile to budete mít, měli byste ho přidat jako tajný kód do pracovního prostoru Databricks. Chcete-li přidat toto tajemství, podívejte se na Správa tajemství.

  • Téma EventHubs Seznam témat najdete v části Event Hubs pod částí Entity na konkrétní stránce Oboru názvů Event Hubs. Pokud chcete pracovat s několika tématy, můžete nastavit roli IAM na úrovni služby Event Hubs.

  • Server služby EventHubs. Najdete ho na stránce přehledu vašeho konkrétního oboru názvů služby Event Hubs:

    Obor názvů služby Event Hubs

Kromě toho, abychom mohli použít Id Entra, musíme kafka říct, aby používal mechanismus SASL OAuth (SASL je obecný protokol a OAuth je typ SASL "mechanismus"):

  • kafka.security.protocol by měla být SASL_SSL
  • kafka.sasl.mechanism by měla být OAUTHBEARER
  • kafka.sasl.login.callback.handler.class by měl být plně kvalifikovaný název třídy Java s hodnotou kafkashaded obslužné rutiny zpětného volání pro přihlášení naší stínované třídy Kafka. Přesné třídy najdete v následujícím příkladu.

Následující příklad konfiguruje platformu Kafka pro připojení ke službě Azure Event Hubs pomocí ověřování Microsoft Entra ID s ID klienta a tajemstvím.

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Připojení Azure Databricks k Kafka pomocí PROTOKOLU SSL

Pokud chcete povolit připojení SSL k systému Kafka, postupujte podle pokynů v dokumentaci ke Confluentu Šifrování a ověřování pomocí SSL. Můžete zadat konfigurace, které jsou zde popsány, s předponou kafka., jako možnosti. Například umístění úložiště důvěryhodnosti by bylo určeno vlastností kafka.ssl.truststore.location.

Pokud budete používat PROTOKOL SSL, Databricks doporučuje:

Následující příklad používá umístění úložiště objektů a tajné kódy Databricks k povolení připojení SSL:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Připojení Kafka ve službě HDInsight k Azure Databricks

  1. Vytvořte cluster HDInsight Kafka.

    Pokyny najdete v tématu Připojení k Platformě Kafka ve službě HDInsight prostřednictvím služby Azure Virtual Network .

  2. Nakonfigurujte zprostředkovatele Kafka tak, aby inzerovali správnou adresu.

    Postupujte podle pokynů v Konfigurujte Kafka pro označení IP. Pokud spravujete Kafka sami na virtuálních počítačích Azure, ujistěte se, že konfigurace advertised.listeners brokerů je nastavena na interní IP adresu hostitelů.

  3. Vytvořte cluster Azure Databricks.

  4. Propojit cluster Kafka s clusterm Azure Databricks.

    Postupujte podle pokynů v partnerských virtuálních sítích.

Zpracování potenciálních chyb

  • Vytvoření nového se nezdařilo. KafkaAdminClient

    Tato vnitřní chyba Kafka se vyvolá, pokud některá z následujících možností ověřování není správná:

    • ID klienta (označované také jako ID aplikace)
    • Identifikátor nájemce
    • Server Event Hubs

    Pokud chcete chybu vyřešit, ověřte správnost hodnot pro tyto možnosti. Kromě toho se tato chyba může zobrazit, pokud upravíte možnosti konfigurace, které jsou ve výchozím nastavení k dispozici v příkladu (například kafka.security.protocol).

  • Nebyly vráceny žádné záznamy.

    Pokud se pokoušíte datový rámec zobrazit nebo zpracovat, ale nezobrazují se vám výsledky, zobrazí se v uživatelském rozhraní následující kód.

    Žádná zpráva o výsledcích

    Tato zpráva znamená, že ověřování proběhlo úspěšně, ale Služba EventHubs nevrátila žádná data. Některé možné (i když bez vyčerpávajícího) důvodu jsou:

    • Zadali jste nesprávné téma EventHubs .
    • Výchozí možnost konfigurace Kafka pro startingOffsets je latest a momentálně nedostáváte žádná data prostřednictvím topicu. Můžete nastavit startingOffsets, aby se earliest začalo číst data od nejstarších offsetů Kafky.