Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página mostra os métodos de autenticação mais comuns para o conector kafka no Azure Databricks.
A lista completa de métodos de autenticação com suporte pode ser encontrada na documentação do Kafka.
Conectar ao Hubs de Eventos do Azure com um principal de serviço
O Azure Databricks oferece suporte à autenticação de trabalhos do Apache Spark com serviços do Event Hubs usando OAuth com o Microsoft Entra ID.
Conectar-se com as credenciais de serviço do Catálogo do Unity
No Databricks Runtime 16.1 e versões posteriores, o Azure Databricks oferece suporte a credenciais de serviço do Unity Catalog para autenticação no Hubs de Eventos do Azure. O Databricks recomenda essa abordagem se você executar o streaming do Kafka em clusters compartilhados ou computação sem servidor.
Para usar uma credencial de serviço do Catálogo do Unity para autenticação, faça o seguinte:
- Crie uma nova credencial de serviço do Unity Catalog. Consulte Criar credenciais de serviço.
- Confirme se o conector de acesso anexado à credencial de serviço tem as permissões corretas para se conectar ao Hubs de Eventos do Azure.
- Defina a opção de origem
databricks.serviceCredentialcomo o nome da credencial de serviço.
O exemplo a seguir configura o Kafka como uma origem usando uma credencial de serviço:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Observação
Quando você usa uma credencial de serviço do Catálogo do Unity para se conectar ao Kafka, não use as seguintes opções:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
Conectar-se com um ID de cliente e uma chave secreta
Azure Databricks dá suporte à autenticação Microsoft Entra ID com uma ID do cliente e um segredo nos seguintes ambientes de computação:
- Databricks Runtime 12.2 LTS e versões posteriores em recursos computacionais configurados com modo de acesso dedicado.
- Databricks Runtime 14.3 LTS ou superior em ambientes computacionais configurados com o modo de acesso padrão.
- Pipelines Declarativos do Lakeflow Spark configurados sem o Unity Catalog.
Azure Databricks não dá suporte à autenticação Microsoft Entra ID com um certificado em qualquer ambiente de computação ou no Lakeflow Spark Declarative Pipelines configurado com Unity Catalog.
Essa autenticação não funciona na computação com o modo de acesso padrão ou nos Pipelines Declarativos do Spark do Unity Catalog Lakeflow.
Para executar a autenticação com Microsoft Entra ID, você deve ter os seguintes valores:
Uma ID de locatário. Você pode encontrá-lo na guia serviços Microsoft Entra ID.
Uma clientID, também conhecida como ID do aplicativo.
Um segredo do cliente. Adicione isso como um segredo ao workspace do Databricks. Confira Gerenciamento de segredos.
Um tópico do Hubs de Eventos. Você pode encontrar uma lista de tópicos na seção Hubs de Eventos na seção Entidades em uma página específica do Namespace do Hubs de Eventos. Para trabalhar com vários tópicos, você pode definir a função IAM no nível dos Hubs de Eventos.
Um servidor do Hubs de Eventos. Você pode encontrar isso na página de visão geral do seu namespace dos Hubs de Eventos específico:
Para usar Entra ID, você deve configurar o Kafka para usar a SASL do OAuth:
- Defina
kafka.security.protocolcomoSASL_SSL - Defina
kafka.sasl.mechanismcomoOAUTHBEARER - Defina
kafka.sasl.login.callback.handler.classcomo um nome totalmente qualificado da classe Java. O nome qualificado ékafkashadede o manipulador de callback de login da classe Kafka sombreada do Databricks. Veja o exemplo a seguir para a classe exata.
SASL é um protocolo de autenticação genérico e o OAuth é um mecanismo SASL.
O exemplo a seguir configura o Kafka para se conectar ao Hubs de Eventos do Azure usando a autenticação do Microsoft Entra ID com um ID de cliente e um segredo:
Python
# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
Usar SASL/PLAIN para autenticar
Para se conectar ao Kafka usando a autenticação SASL/PLAIN (nome de usuário e senha), configure as opções a seguir. Use o nome da classe sombreada PlainLoginModule :
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Azure Databricks recomenda que você armazene sua senha como um segredo em vez de incluí-la diretamente em seu código. Para obter mais informações, consulte Gerenciamento de segredos.
Usar SASL/SCRAM para autenticar
Para se conectar ao Kafka usando SASL/SCRAM (SCRAM-SHA-256 ou SCRAM-SHA-512), configure as opções a seguir. Use o nome da classe sombreada ScramLoginModule :
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Observação
SCRAM-SHA-512 Substitua SCRAM-SHA-256 se o cluster Kafka estiver configurado para usar SCRAM-SHA-256.
Azure Databricks recomenda que você armazene sua senha como um segredo em vez de incluí-la diretamente em seu código. Para obter mais informações, consulte Gerenciamento de segredos.
Use SSL para conectar Azure Databricks ao Kafka
Para habilitar conexões SSL/TLS com o Kafka, defina kafka.security.protocolSSL e forneça as opções de configuração do repositório de confiança e do repositório de chaves prefixadas com kafka.. Para conexões SSL que exigem apenas autenticação de servidor (TLS unidirecional), você deve usar um repositório de confiança. Para TLS mútuo (mTLS), em que o broker do Kafka também autentica o cliente, você deve usar um truststore e um keystore.
As seguintes opções de SSL/TLS estão disponíveis. Para obter a lista completa de propriedades SSL, consulte a documentação de configuração do SSL do Apache Kafkae Criptografia e Autenticação com SSL na documentação do Confluent.
| Opção | Descrição |
|---|---|
kafka.security.protocol |
Defina SSL para habilitar a criptografia TLS. |
kafka.ssl.truststore.location |
Caminho para o arquivo de repositório de confiança que contém certificados de AC confiáveis. |
kafka.ssl.truststore.password |
Senha para o arquivo de armazenamento de confiança. |
kafka.ssl.truststore.type |
Formato de arquivo do repositório de confiança (padrão: JKS). |
kafka.ssl.keystore.location |
Caminho para o arquivo de repositório de chaves que contém o certificado do cliente e a chave privada (necessário para mTLS). |
kafka.ssl.keystore.password |
Senha do arquivo do repositório de chaves. |
kafka.ssl.key.password |
Senha para a chave privada no repositório de chaves. |
kafka.ssl.endpoint.identification.algorithm |
Algoritmo de verificação de nome do host. Usa https como padrão. Defina como uma cadeia de caracteres vazia para desabilitar. |
Se você usar o SSL, o Databricks recomenda que você:
- Armazene seus certificados em um volume do Catálogo do Unity. Os usuários que têm acesso à leitura do volume podem usar seus certificados Kafka. Para obter mais informações, consulte O que são os volumes do Unity Catalog?.
- Armazene suas senhas de certificado como segredos em um escopo secreto. Para obter mais informações, consulte Gerenciar escopos secretos.
O exemplo a seguir usa locais de armazenamento de objetos e segredos do Databricks para habilitar uma conexão SSL:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Conectar o Kafka no HDInsight ao Azure Databricks
Crie um cluster do Kafka do HDInsight.
Consulte Conectar ao Kafka no HDInsight através de uma Rede Virtual do Azure para obter instruções.
Configure os agentes do Kafka para anunciar o endereço correto.
Siga as instruções em Configurar Kafka para publicidade de IP. Se você gerenciar o Kafka você mesmo em Máquinas Virtuais do Azure, verifique se a configuração
advertised.listenersdos brokers está definida como o IP interno dos hosts.Crie um cluster no Azure Databricks.
Emparelhe o cluster Kafka ao cluster Azure Databricks.
Siga as instruções em Redes virtuais emparelhadas.
Usar nomes de classe Kafka sombreados do Databricks
Azure Databricks agrupa versões proprietárias e sombreadas das bibliotecas de clientes do Kafka. Todos os nomes de classe de cliente kafka referenciados nas opções de configuração de autenticação devem usar o prefixo de nome de classe sombreado em vez do nome da classe de software livre padrão. Isso se aplica a qualquer classe referenciada em opções como kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.classe kafka.sasl.client.callback.handler.class.
Se você usar nomes de classe não sombreados, seu código gerará um erro RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED. Consulte as perguntas frequentes para obter mais detalhes.
Lidando com possíveis erros
Não foi possível criar um novo
KafkaAdminClientEsse erro kafka interno será gerado se qualquer uma das seguintes opções de autenticação estiver incorreta:
- Uma clientID (também conhecida como ID do aplicativo)
- ID do locatário
- Servidor de Hubs de Eventos
Para resolver o erro, verifique se os valores estão corretos para essas opções. Além disso, você poderá ver esse erro se modificar as opções de configuração fornecidas por padrão no exemplo (como
kafka.security.protocol).Nenhum registro retornado
Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.
Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Alguns motivos possíveis (embora não sejam exaustivos) são:
- Você especificou o tópico incorreto do Hubs de Eventos.
- A opção de configuração padrão do Kafka para
startingOffsetsélateste você ainda não está recebendo dados por meio do tópico. Você pode definirstartingOffsetsparaearliesta partir dos primeiros deslocamentos do Kafka para ler dados.