Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Konektor Azure Databricks Kafka podporuje více metod ověřování pro připojení k Kafka. Tento článek se zabývá některými nejběžnějšími metodami ověřování v Databricks. Úplný seznam podporovaných metod ověřování najdete v dokumentaci kafka.
Ověřování principálu služby pomocí ID Microsoft Entra a Azure Event Hubs
Azure Databricks podporuje ověřování úloh Sparku pomocí služeb Event Hubs. Toto ověřování se provádí prostřednictvím OAuth s ID Microsoft Entra.
Připojení pomocí přihlašovacích údajů služby Katalogu Unity
Od vydání verze Databricks Runtime 16.1 Azure Databricks podporuje přihlašovací údaje služby Unity Catalog pro ověřování přístupu k AWS Managed Streaming pro Azure Event Hubs. Databricks doporučuje tento přístup, zejména při spouštění streamování Kafka na sdílených clusterech nebo bezserverových výpočetních prostředcích.
Pokud chcete pro ověřování použít přihlašovací údaje služby Katalogu Unity, proveďte následující kroky:
- Vytvořte nové přihlašovací údaje služby Katalogu Unity. Pokud tento proces neznáte, přečtěte si téma Vytvoření přihlašovacích údajů služby a pokyny k jeho vytvoření.
- Ujistěte se, že přístupový konektor připojený k přihlašovacím údajům služby má potřebná oprávnění pro připojení ke službě Azure Event Hubs.
- Jako zdrojovou možnost v konfiguraci Kafka zadejte název přihlašovacích údajů služby Katalogu Unity. Nastavte možnost
databricks.serviceCredentialna název přihlašovacích údajů vaší služby.
Následující příklad nakonfiguruje Kafka jako zdroj pomocí přihlašovacích údajů služby:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Poznámka: Při použití přihlašovacích údajů služby Katalogu Unity pro připojení k Systému Kafka už nejsou potřeba následující možnosti:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
Připojení s ID klienta a tajným kódem
Azure Databricks podporuje ověřování Microsoft Entra ID s ID klienta a tajným kódem v následujících výpočetních prostředích:
- Databricks Runtime 12.2 LTS a vyšší na výpočetních prostředcích nakonfigurovaných s vyhrazeným režimem přístupu (dříve režim přístupu jednoho uživatele).
- Databricks Runtime 14.3 LTS a vyšší na výpočetních prostředcích nakonfigurovaných pomocí standardního režimu přístupu (dříve režimu sdíleného přístupu).
- Deklarativní kanály Sparku Lakeflow nakonfigurované bez katalogu Unity
Azure Databricks nepodporuje ověřování MICROSOFT Entra ID pomocí certifikátu v žádném výpočetním prostředí ani v deklarativních kanálech Sparku Lakeflow nakonfigurovaných pomocí katalogu Unity.
Toto ověřování nefunguje na výpočetních prostředcích se standardním režimem přístupu ani v deklarativních kanálech Sparku v Unity Catalog Lakeflow.
Pokud chcete provést ověřování s ID Microsoft Entra, musíte mít následující hodnoty:
Identifikátor nájemce. Najdete to na kartě služeb Microsoft Entra ID.
ID klienta (označované také jako ID aplikace).
Tajemství klienta. Jakmile to budete mít, měli byste ho přidat jako tajný kód do pracovního prostoru Databricks. Chcete-li přidat toto tajemství, podívejte se na Správa tajemství.
Téma EventHubs Seznam témat najdete v části Event Hubs pod částí Entity na konkrétní stránce Oboru názvů Event Hubs. Pokud chcete pracovat s několika tématy, můžete nastavit roli IAM na úrovni služby Event Hubs.
Server služby EventHubs. Najdete ho na stránce přehledu vašeho konkrétního oboru názvů služby Event Hubs:
Kromě toho, abychom mohli použít Id Entra, musíme kafka říct, aby používal mechanismus SASL OAuth (SASL je obecný protokol a OAuth je typ SASL "mechanismus"):
-
kafka.security.protocolby měla býtSASL_SSL -
kafka.sasl.mechanismby měla býtOAUTHBEARER -
kafka.sasl.login.callback.handler.classby měl být plně kvalifikovaný název třídy Java s hodnotoukafkashadedobslužné rutiny zpětného volání pro přihlášení naší stínované třídy Kafka. Přesné třídy najdete v následujícím příkladu.
Následující příklad konfiguruje platformu Kafka pro připojení ke službě Azure Event Hubs pomocí ověřování Microsoft Entra ID s ID klienta a tajemstvím.
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
Připojení Azure Databricks k Kafka pomocí PROTOKOLU SSL
Pokud chcete povolit připojení SSL k systému Kafka, postupujte podle pokynů v dokumentaci ke Confluentu Šifrování a ověřování pomocí SSL. Můžete zadat konfigurace, které jsou zde popsány, s předponou kafka., jako možnosti. Například umístění úložiště důvěryhodnosti by bylo určeno vlastností kafka.ssl.truststore.location.
Pokud budete používat PROTOKOL SSL, Databricks doporučuje:
- Uložte certifikáty do objemu Unity Catalogu. Uživatelé, kteří mají přístup ke čtení ze svazku, budou moct používat vaše certifikáty Kafka.
- Uložte hesla certifikátů jako tajemství v tajné oblasti.
Následující příklad používá umístění úložiště objektů a tajné kódy Databricks k povolení připojení SSL:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Připojení Kafka ve službě HDInsight k Azure Databricks
Vytvořte cluster HDInsight Kafka.
Pokyny najdete v tématu Připojení k Platformě Kafka ve službě HDInsight prostřednictvím služby Azure Virtual Network .
Nakonfigurujte zprostředkovatele Kafka tak, aby inzerovali správnou adresu.
Postupujte podle pokynů v Konfigurujte Kafka pro označení IP. Pokud spravujete Kafka sami na virtuálních počítačích Azure, ujistěte se, že konfigurace
advertised.listenersbrokerů je nastavena na interní IP adresu hostitelů.Vytvořte cluster Azure Databricks.
Propojit cluster Kafka s clusterm Azure Databricks.
Postupujte podle pokynů v partnerských virtuálních sítích.
Zpracování potenciálních chyb
Vytvoření nového se nezdařilo.
KafkaAdminClientTato vnitřní chyba Kafka se vyvolá, pokud některá z následujících možností ověřování není správná:
- ID klienta (označované také jako ID aplikace)
- Identifikátor nájemce
- Server Event Hubs
Pokud chcete chybu vyřešit, ověřte správnost hodnot pro tyto možnosti. Kromě toho se tato chyba může zobrazit, pokud upravíte možnosti konfigurace, které jsou ve výchozím nastavení k dispozici v příkladu (například
kafka.security.protocol).Nebyly vráceny žádné záznamy.
Pokud se pokoušíte datový rámec zobrazit nebo zpracovat, ale nezobrazují se vám výsledky, zobrazí se v uživatelském rozhraní následující kód.
Tato zpráva znamená, že ověřování proběhlo úspěšně, ale Služba EventHubs nevrátila žádná data. Některé možné (i když bez vyčerpávajícího) důvodu jsou:
- Zadali jste nesprávné téma EventHubs .
- Výchozí možnost konfigurace Kafka pro
startingOffsetsjelatesta momentálně nedostáváte žádná data prostřednictvím topicu. Můžete nastavitstartingOffsets, aby seearliestzačalo číst data od nejstarších offsetů Kafky.