Compartilhar via


Autenticação

O conector kafka do Azure Databricks dá suporte a vários métodos de autenticação para se conectar ao Kafka. Este artigo aborda alguns dos métodos de autenticação mais comuns no Databricks. A lista completa de métodos de autenticação com suporte pode ser encontrada na documentação do Kafka.

Autenticação do principal de serviço com Microsoft Entra ID e Hubs de Eventos do Azure

O Azure Databricks dá suporte à autenticação de trabalhos do Spark com serviços de Hubs de Eventos. Essa autenticação é feita por meio do OAuth com o Microsoft Entra ID.

Diagrama de autenticação do AAD

Conectar-se com as credenciais de serviço do Catálogo do Unity

Desde o lançamento do Databricks Runtime 16.1, o Azure Databricks dá suporte às credenciais de serviço do Catálogo do Unity para autenticar o acesso ao Streaming Gerenciado do AWS para Hubs de Eventos do Azure. O Databricks recomenda essa abordagem, especialmente ao executar o streaming kafka em clusters compartilhados ou computação sem servidor.

Para usar uma credencial de serviço do Catálogo do Unity para autenticação, execute as seguintes etapas:

  • Crie uma nova credencial de serviço do Unity Catalog. Se você não estiver familiarizado com esse processo, consulte Criar credenciais de serviço para obter instruções sobre como criar um.
    • Verifique se o conector de acesso anexado à credencial de serviço tem as permissões necessárias para se conectar aos Hubs de Eventos do Azure.
  • Forneça o nome da credencial de serviço do Catálogo do Unity como uma opção de origem na configuração do Kafka. Defina a opção databricks.serviceCredential como o nome da credencial de serviço.

O exemplo a seguir configura o Kafka como uma origem usando uma credencial de serviço:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Observação: quando você usa uma credencial de serviço do Catálogo do Unity para se conectar ao Kafka, as seguintes opções não são mais necessárias:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Conectar-se com um ID de cliente e uma chave secreta

O Azure Databricks dá suporte à autenticação do Microsoft Entra ID com uma ID do cliente e um segredo nos seguintes ambientes de computação:

  • Databricks Runtime 12.2 LTS e superior em computação configurada com o modo de acesso dedicado (anteriormente modo de acesso de usuário único).
  • Databricks Runtime 14.3 LTS e versões superiores em computação configurada com o modo de acesso padrão (anteriormente modo de acesso compartilhado).
  • Pipelines Declarativos do Lakeflow Spark configurados sem o Unity Catalog.

O Azure Databricks não dá suporte à autenticação do Microsoft Entra ID com um certificado em qualquer ambiente de computação, ou em pipelines declarativos do Lakeflow Spark configurados com o Unity Catalog.

Essa autenticação não funciona na computação com o modo de acesso padrão ou nos Pipelines Declarativos do Spark do Unity Catalog Lakeflow.

Para executar a autenticação com a ID do Microsoft Entra, você deve ter os seguintes valores:

  • Uma ID de locatário. Você pode encontrá-la na guia de serviço do Microsoft Entra ID.

  • Uma clientID (também conhecida como ID do aplicativo).

  • Um segredo do cliente. Quando tiver isso, você deverá adicioná-lo como um segredo ao seu Databricks Workspace. Para adicionar esse segredo, consulte Gerenciamento de segredo.

  • Um tópico do Hubs de Eventos. Você pode encontrar uma lista de tópicos na seção Hubs de Eventos na seção Entidades em uma página específica do Namespace do Hubs de Eventos. Para trabalhar com vários tópicos, você pode definir a função IAM no nível dos Hubs de Eventos.

  • Um servidor do Hubs de Eventos. Você pode encontrar isso na página de visão geral do seu namespace dos Hubs de Eventos específico:

    Namespace do Hubs de Eventos

Além disso, para usar o Entra ID, precisamos informar ao Kafka para usar o mecanismo SASL do OAuth (SASL é um protocolo genérico e o OAuth é um tipo de “mecanismo” SASL):

  • kafka.security.protocol deve ser SASL_SSL
  • kafka.sasl.mechanism deve ser OAUTHBEARER
  • kafka.sasl.login.callback.handler.class deve ser um nome totalmente qualificado da classe Java com um valor de kafkashaded para o manipulador de retorno de chamada de logon da nossa classe Kafka sombreada. Veja o exemplo a seguir para a classe exata.

O exemplo a seguir configura o Kafka para se conectar aos Azure Event Hubs usando a autenticação com ID de Microsoft Entra com uma ID e segredo do cliente.

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Usar SSL para conectar o Azure Databricks ao Kafka

Para habilitar as conexões SSL com o Kafka, siga as instruções na documentação confluente Autenticação e Criptografia com o SSL. Você pode fornecer as configurações descritas, prefixadas com kafka., como opções. Por exemplo, o local do trust store seria especificado com a propriedade kafka.ssl.truststore.location.

Se você usar o SSL, o Databricks recomenda que você:

O exemplo a seguir usa locais de armazenamento de objetos e segredos do Databricks para habilitar uma conexão SSL:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Conexão Kafka no HDInsight para Azure Databricks

  1. Crie um cluster do Kafka do HDInsight.

    Consulte Conectar ao Kafka no HDInsight por meio de uma Rede Virtual do Azure para instruções.

  2. Configure os agentes do Kafka para anunciar o endereço correto.

    Siga as instruções em Configurar Kafka para publicidade de IP. Se você gerenciar o Kafka sozinho em máquinas virtuais do Azure, certifique-se de que a configuração advertised.listeners dos agentes esteja definida para o IP interno dos hosts.

  3. Criar um cluster do Azure Databricks.

  4. Emparelhar o cluster Kafka com o cluster Azure Databricks.

    Siga as instruções em Redes virtuais emparelhadas.

Lidando com possíveis erros

  • Não foi possível criar um novo KafkaAdminClient

    Esse erro kafka interno será gerado se qualquer uma das seguintes opções de autenticação estiver incorreta:

    • Uma clientID (também conhecida como ID do aplicativo)
    • ID do locatário
    • Servidor de Hubs de Eventos

    Para resolver o erro, verifique se os valores estão corretos para essas opções. Além disso, você poderá ver esse erro se modificar as opções de configuração fornecidas por padrão no exemplo (como kafka.security.protocol).

  • Nenhum registro retornado

    Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.

    Nenhuma mensagem de resultados

    Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Alguns motivos possíveis (embora não sejam exaustivos) são:

    • Você especificou o tópico incorreto do Hubs de Eventos.
    • A opção de configuração padrão do Kafka para startingOffsets é latest e você ainda não está recebendo dados por meio do tópico. Você pode definir startingOffsets para earliest a partir dos primeiros deslocamentos do Kafka para ler dados.