Processamento de fluxo com o Apache Kafka e o Azure Databricks

Este artigo descreve como você pode usar o Apache Kafka como uma origem ou um coletor ao executar cargas de trabalho de Fluxo Estruturado no Azure Databricks.

Para mais informações sobre o Kafka, confira a documentação do Kafka.

Ler dados do Kafka

Veja a seguir um exemplo para uma leitura de fluxo do Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

O Azure Databricks também dá suporte à semântica de leitura em lote para fontes de dados do Kafka, conforme mostrado no exemplo a seguir:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Para um carregamento em lote incremental, o Databricks recomenda usar o Kafka com Trigger.AvailableNow. Confira Configuração do processamento em lote incremental.

No Databricks Runtime 13.3 LTS e versões superiores, o Azure Databricks fornece uma função SQL para ler dados Kafka. O streaming com SQL tem suporte apenas no Delta Live Tables ou com tabelas de streaming no Databricks SQL. Consulte função read_kafka table-valued.

Configurar leitor de Fluxo Estruturado do Kafka

O Azure Databricks fornece a palavra-chave do kafka como um formato de dados para configurar conexões com o Kafka 0.10+.

Veja a seguir as configurações do Kafka mais comuns:

Há várias maneiras de especificar quais tópicos assinar. Você deve fornecer apenas um destes parâmetros:

Opção Valor Descrição
assinar Uma lista separada por vírgulas de tópicos. A lista de tópicos para assinatura.
subscribePattern Cadeia de caracteres Regex Java. O padrão usado para assinar tópico(s).
assign Cadeia de caracteres JSON {"topicA":[0,1],"topic":[2,4]}. TopicPartitions específico a consumir.

Outras configurações notáveis:

Opção Valor Valor Padrão Descrição
kafka.bootstrap.servers Uma lista de separada por vírgulas de host:port. empty [Necessário] A configuração do Kafka bootstrap.servers. Se você achar que não há dados de Kafka, verifique primeiro a lista de endereços do agente. Se a lista de endereços do agente estiver incorreta, talvez não haja erros. Isso ocorre porque o cliente Kafka assume que os agentes se tornarão disponíveis eventualmente e, em caso de erros de rede, tente para sempre.
failOnDataLoss true ou false. true [Opcional] Se a consulta será reprovada quando for possível que os dados sejam perdidos. As consultas podem falhar permanentemente ao ler dados do Kafka devido a muitos cenários, como tópicos excluídos, truncamento de tópico antes do processamento e assim por diante. Tentamos estimar de forma conservadora se os dados possivelmente foram perdidos ou não. Às vezes, isso pode causar alarmes falsos. Defina essa opção como false se ela não funcionar conforme o esperado ou se você quiser que a consulta continue o processamento apesar da perda de dados.
minPartitions Inteiro > = 0, 0 = desabilitado. 0 (desabilitado) [Opcional] Número mínimo de partições a serem lidas de Kafka. Você pode configurar o Spark para usar um mínimo de partições arbitrário para ler a partir do Kafka usando a opção minPartitions. Normalmente, o Spark tem um mapeamento individual de Kafka topicPartitions para as partições do Spark que consomem do Kafka. Se você definir a opção minPartitions como um valor maior que o Kafka topicPartitions, o Spark dividirá grandes partições de Kafka para partes menores. Essa opção pode ser definida em horas de picos de carregamentos, distorção de dados e à medida que seu fluxo está atrasado para aumentar a taxa de processamento. Ele vem a um custo de inicialização de consumidores de Kafka em cada gatilho, o que pode afetar o desempenho se você usar SSL ao se conectar ao Kafka.
kafka.group.id Uma ID do grupo de consumidores do Kafka. não definido [Opcional] Adicional ID do grupo a ser usada durante a leitura de Kafka. Use esse modo com cuidado. Por padrão, cada consulta gera uma ID de grupo exclusiva para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores que não enfrenta interferência de nenhum outro consumidor e, portanto, pode ler todas as partições de seus tópicos assinados. Em alguns cenários (por exemplo, autorização baseada em grupo Kafka), talvez você queira usar IDs de grupo autorizadas específicas para ler os dados. Opcionalmente, você pode definir a ID do grupo. No entanto, faça isso com muito cuidado, pois isso pode causar um comportamento inesperado.

* A execução simultânea de consultas (ambas, lote e streaming) com a mesma ID de grupo provavelmente interfere entre si, fazendo com que cada consulta Leia apenas parte dos dados.
* Isso também pode ocorrer quando as consultas são iniciadas/reiniciadas em uma sucessão rápida. Para minimizar esses problemas, defina a configuração session.timeout.ms do consumidor Kafka como muito pequena.
startingOffsets mais antigo, mais recente mais recente [Opcional] O ponto de partida quando uma consulta é iniciada, “mais antigo”, que é dos deslocamentos mais antigos, ou uma cadeia de caracteres JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 como um deslocamento pode ser usado para se referir ao mais antigo, e -1 ao mais recente. Observação: para as consultas em lote, a opção “mais recente” (implicitamente ou usando -1 em JSON) não é permitida. Para as consultas de streaming, isso só se aplica quando uma nova consulta é iniciada, e essa retomada sempre começará no ponto em que a consulta parou. As partições recém-descobertas durante uma consulta serão iniciadas na opção “mais antigo”.

Consulte Guia de integração Kafka de Fluxo Estruturado para outras configurações opcionais.

Esquema para registros do Kafka

O esquema de registros do Kafka é:

Coluna Type
chave binary
value binary
topic string
partition INT
deslocamento long
timestamp long
timestampType INT

O key e o value são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer. Use as operações de DataFrame (por exemplo, cast("string")) para desserializar explicitamente as chaves e valores.

Gravar dados no Kafka

Veja a seguir um exemplo para uma gravação de fluxo do Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

O Azure Databricks também dá suporte à semântica de gravação em lote para coletor de dados do Kafka, conforme mostrado no exemplo a seguir:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Configurar gravador de Fluxo Estruturado do Kafka

Importante

O Databricks Runtime 13.3 LTS e versões superiores inclui uma versão mais recente da biblioteca kafka-clients que permite gravações idempotentes por padrão. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Resolva esse erro atualizando para o Kafka versão 2.8.0 ou superior ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de streaming estruturado.

O esquema fornecido ao DataStreamWriter interage com o coletor Kafka. Você pode usar o seguinte método:

Nome da coluna Obrigatório ou opcional Type
key opcional STRING ou BINARY
value obrigatório STRING ou BINARY
headers opcional ARRAY
topic opcional (ignorado se topic for definido como opção de gravador) STRING
partition opcional INT

Veja a seguir opções comuns definidas durante a gravação no Kafka:

Opção Valor Valor padrão Descrição
kafka.boostrap.servers Uma lista de <host:port> separada por vírgulas none [Necessário] A configuração do Kafka bootstrap.servers.
topic STRING não definido [Opcional] Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico que existem nos dados.
includeHeaders BOOLEAN false [Opcional] Se os cabeçalhos Kafka devem ser incluídos na linha.

Consulte Guia de integração Kafka de Fluxo Estruturado para outras configurações opcionais.

Recuperar métricas do Kafka

Você pode obter a média, mín e máx do número de deslocamentos que a consulta de streaming está atrás do deslocamento disponível mais recente entre todos os tópicos assinados com as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Consulte lendo métricas de forma interativa.

Observação

Disponível no Databricks Runtime 9.1 e superior.

Obtenha o número total estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos examinando o valor de estimatedTotalBytesBehindLatest. Essa estimativa se baseia nos lotes que foram processados nos últimos 300 segundos. O período de tempo em que a estimativa se baseia pode ser alterado definindo a opção bytesEstimateWindowLength como um valor diferente. Por exemplo, para configurá-lo para 10 minutos:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se estiver executando o fluxo em um notebook, você poderá ver essas métricas na guia Dados Brutos no painel de controle do progresso da consulta de fluxo:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Usar SSL para conectar o Azure Databricks ao Kafka

Para habilitar as conexões SSL com o Kafka, siga as instruções na documentação confluente Autenticação e Criptografia com o SSL. Você pode fornecer as configurações descritas, prefixadas com kafka., como opções. Por exemplo, você especifica o local do repositório de confiança na propriedade kafka.ssl.truststore.location.

O Databricks recomenda que você:

O exemplo a seguir usa locais de armazenamento de objetos e segredos do Databricks para habilitar uma conexão SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Conexão Kafka no HDInsight para Azure Databricks

  1. Crie um cluster HDInsight Kafka.

    Consulte Conectar ao Kafka no HDInsight por meio de uma Rede Virtual do Azure para instruções.

  2. Configure os agentes Kafka para anunciar o endereço correto.

    Siga as instruções em Configurar Kafka para publicidade de IP. Se você gerenciar o Kafka sozinho em máquinas virtuais do Azure, certifique-se de que a configuração advertised.listeners dos agentes esteja definida para o IP interno dos hosts.

  3. Criar um cluster do Azure Databricks.

  4. Emparelhar o cluster Kafka com o cluster Azure Databricks.

    Siga as instruções em Redes virtuais emparelhadas.

Autenticação da Entidade de Serviço com o Microsoft Entra ID (antigo Azure Active Directory) e hubs de eventos do Azure

O Azure Databricks dá suporte à autenticação de trabalhos do Spark com serviços de Hubs de Eventos. Essa autenticação é feita por meio do OAuth com o Microsoft Entra ID (antigo Azure Active Directory).

Diagrama de autenticação do AAD

O Azure Databricks dá suporte à autenticação do Microsoft Entra ID com uma ID do cliente e um segredo nos seguintes ambientes de computação:

  • Databricks Runtime 12.2 LTS e superior na computação configurada com o modo de acesso de usuário único.
  • Databricks Runtime 14.3 LTS e superior na computação configurada com o modo de acesso compartilhado.
  • Pipelines do Delta Live Tables configurados sem o Catálogo do Unity.

O Azure Databricks não dá suporte à autenticação do Microsoft Entra ID com um certificado em qualquer ambiente de computação ou em pipelines do Delta Live Tables configurados com o Catálogo do Unity.

Essa autenticação não funciona em clusters compartilhados ou em Tabelas Dinâmicas Delta do Catálogo do Unity.

Configurando o conector Kafka de Streaming estruturado

Para executar a autenticação com o Microsoft Entra ID, você precisará dos seguintes valores:

  • Uma ID de locatário. Você pode encontrá-la na guia de serviço do Microsoft Entra ID.

  • Uma clientID (também conhecida como ID do aplicativo).

  • Um segredo do cliente. Quando tiver isso, você deverá adicioná-lo como um segredo ao seu Databricks Workspace. Para adicionar esse segredo, consulte Gerenciamento de segredo.

  • Um tópico do Hubs de Eventos. Você pode encontrar uma lista de tópicos na seção Hubs de Eventos na seção Entidades em uma página específica do Namespace do Hubs de Eventos. Para trabalhar com vários tópicos, você pode definir a função IAM no nível do Hubs de Eventos.

  • Um servidor do Hubs de Eventos. Você pode encontrá-lo na página de visão geral do Namespace do Hubs de Eventos específico:

    Namespace do Hubs de Eventos

Além disso, para usar o Entra ID, precisamos informar ao Kafka para usar o mecanismo SASL do OAuth (SASL é um protocolo genérico e o OAuth é um tipo de “mecanismo” SASL):

  • kafka.security.protocol deve ser SASL_SSL
  • kafka.sasl.mechanism deve ser OAUTHBEARER
  • kafka.sasl.login.callback.handler.class deve ser um nome totalmente qualificado da classe Java com um valor de kafkashaded para o manipulador de retorno de chamada de logon da nossa classe Kafka sombreada. Veja o exemplo a seguir para a classe exata.

Exemplo

A seguir, vamos dar uma olhada em um exemplo de execução:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Lidando com possíveis erros

  • Não há suporte para opções de streaming.

    Se você tentar usar esse mecanismo de autenticação em um pipeline do Delta Live Tables configurado com o Catálogo do Unity, poderá receber o seguinte erro:

    Erro de streaming sem suporte

    Para resolver esse erro, use uma configuração de computação com suporte. Consulte Autenticação da Entidade de Serviço com o Microsoft Entra ID (antigo Azure Active Directory) e Hubs de Eventos do Azure.

  • Falha ao criar um novo KafkaAdminClient.

    Esse é um erro interno que o Kafka gera se qualquer uma das seguintes opções de autenticação estiver incorreta:

    • Uma clientID (também conhecida como ID do aplicativo)
    • ID do locatário
    • Um servidor do Hubs de Eventos

    Para resolver o erro, verifique se os valores estão corretos para essas opções.

    Além disso, você poderá ver esse erro se modificar as opções de configuração fornecidas por padrão no exemplo (que você foi solicitado a não modificar), como kafka.security.protocol.

  • Não há registros sendo retornados

    Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, você verá o seguinte na interface do usuário.

    Nenhuma mensagem de resultados

    Essa mensagem significa que a autenticação foi bem-sucedida, mas o Hubs de Eventos não retornou nenhum dado. Alguns motivos possíveis (embora não sejam exaustivos) são:

    • Você especificou o tópico incorreto do Hubs de Eventos.
    • A opção de configuração padrão do Kafka para startingOffsets é latest e você ainda não está recebendo dados por meio do tópico. Você pode definir startingOffsetstoearliest para começar a ler dados a partir dos primeiros deslocamentos do Kafka.