Compartilhar via


Processamento de fluxo com o Apache Kafka e o Azure Databricks

Este artigo descreve como você pode usar o Apache Kafka como uma origem ou um coletor ao executar cargas de trabalho de Fluxo Estruturado no Azure Databricks.

Para mais informações sobre o Kafka, confira a documentação do Kafka.

Ler dados do Kafka

O exemplo a seguir é uma leitura de fluxo do Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

O Azure Databricks também dá suporte à semântica de leitura em lote para fontes de dados do Kafka, conforme mostrado no exemplo a seguir:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Para um carregamento em lote incremental, o Databricks recomenda usar o Kafka com Trigger.AvailableNow. Confira Configuração do processamento em lote incremental.

No Databricks Runtime 13.3 LTS e versões superiores, o Azure Databricks fornece uma função SQL para ler dados Kafka. O streaming com SQL tem suporte apenas no Lakeflow Declarative Pipelines ou com tabelas de streaming no Databricks SQL. Consulte TVF read_kafka.

Configurar leitor de Fluxo Estruturado do Kafka

O Azure Databricks fornece a palavra-chave do kafka como um formato de dados para configurar conexões com o Kafka 0.10+.

Veja a seguir as configurações do Kafka mais comuns:

Há várias maneiras de especificar quais tópicos assinar. Você deve fornecer apenas um destes parâmetros:

Opção Valor Descrição
assinar Uma lista separada por vírgulas de tópicos. A lista de tópicos para assinatura.
subscribePattern Cadeia de caracteres regex Java. O padrão usado para assinar tópico(s).
atribuir Cadeia de caracteres JSON {"topicA":[0,1],"topic":[2,4]}. Especificar as partições de tópicos a serem consumidas.

Outras configurações notáveis:

Opção Valor Valor Padrão Descrição
kafka.bootstrap.servers Uma lista de separada por vírgulas de host:port. vazio [Obrigatório] A configuração bootstrap.servers do Kafka. Se você achar que não há dados de Kafka, verifique primeiro a lista de endereços do agente. Se a lista de endereços do agente estiver incorreta, talvez não haja erros. Isso ocorre porque o cliente Kafka assume que os brokers ficarão disponíveis eventualmente e, em caso de erros de rede, repete para sempre.
failOnDataLoss true ou false. true [Opcional] Se a consulta deve falhar quando é possível que os dados foram perdidos. As consultas podem falhar permanentemente ao ler dados do Kafka devido a muitos cenários, como tópicos excluídos, truncamento de tópico antes do processamento e assim por diante. Tentamos estimar de forma conservadora se os dados possivelmente foram perdidos ou não. Às vezes, isso pode causar alarmes falsos. Defina essa opção como false se ela não funcionar conforme o esperado ou se você quiser que a consulta continue o processamento apesar da perda de dados.
minPartitions Inteiro >= 0, 0 = desabilitado. 0 (desabilitado) [Opcional] Número mínimo de partições a serem lidas do Kafka. Você pode configurar o Spark para usar um mínimo de partições arbitrário para ler a partir do Kafka usando a opção minPartitions. Normalmente, o Spark tem um mapeamento individual de topicPartitions do Kafka para partições do Spark consumindo do Kafka. Se você definir a opção minPartitions como um valor maior que o Kafka topicPartitions, o Spark dividirá grandes partições de Kafka para partes menores. Essa opção pode ser definida em horas de picos de carregamentos, distorção de dados e à medida que seu fluxo está atrasado para aumentar a taxa de processamento. Ele vem a um custo de inicialização de consumidores de Kafka em cada gatilho, o que pode afetar o desempenho se você usar SSL ao se conectar ao Kafka.
kafka.group.id Uma ID de grupo de consumidores do Kafka. não definido [Opcional] Adicional ID do grupo a ser usada durante a leitura de Kafka. Use esse modo com cuidado. Por padrão, cada consulta gera uma ID de grupo exclusiva para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores que não enfrente interferência de nenhum outro consumidor e, portanto, possa ler todas as partições de seus tópicos assinados. Em alguns cenários (por exemplo, autorização baseada em grupo Kafka), talvez você queira usar IDs de grupo autorizadas específicas para ler os dados. Opcionalmente, você pode definir a ID do grupo. No entanto, faça isso com muito cuidado, pois isso pode causar um comportamento inesperado.
  • As consultas em execução simultânea (em lote e streaming) com a mesma ID de grupo provavelmente interferem entre si, fazendo com que cada consulta leia apenas parte dos dados.
  • Isso também pode ocorrer quando as consultas são iniciadas/reiniciadas em rápida sucessão. Para minimizar esses problemas, defina a configuração session.timeout.ms do consumidor Kafka como muito pequena.
startingOffsets mais antigo, mais recente mais recente [Opcional] O ponto de partida quando uma consulta é iniciada, "mais antigo", que é a partir dos deslocamentos mais antigos, ou uma cadeia de caracteres JSON que especifica um deslocamento inicial para cada TopicPartition. No json, -2 como deslocamento pode ser usado para se referir ao mais antigo, -1 ao mais recente. Observação: para as consultas em lote, a opção “mais recente” (implicitamente ou usando -1 em JSON) não é permitida. Para as consultas de streaming, isso só se aplica quando uma nova consulta é iniciada, e essa retomada sempre começará no ponto em que a consulta parou. As partições recém-descobertas durante uma consulta começarão no mais antigo.

Consulte Guia de integração Kafka de streaming estruturado para ver configurações opcionais.

Esquema para registros do Kafka

O esquema de registros do Kafka é:

Coluna Tipo
chave binário
valor binário
tópico cadeia de caracteres
partition int
deslocamento long
carimbo de data/hora long
timestampType int

O key e o value são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer. Use as operações de DataFrame (por exemplo, cast("string")) para desserializar explicitamente as chaves e valores.

Gravar dados no Kafka

O exemplo a seguir é para uma gravação de fluxo no Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

O Azure Databricks também dá suporte à semântica de gravação em lote para coletor de dados do Kafka, conforme mostrado no exemplo a seguir:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Configurar o gravador de Streaming Estruturado do Kafka

Importante

O Databricks Runtime 13.3 LTS e versões superiores inclui uma versão mais recente da biblioteca kafka-clients que permite gravações idempotentes por padrão. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Resolva esse erro atualizando para o Kafka versão 2.8.0 ou superior ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de streaming estruturado.

O esquema fornecido ao DataStreamWriter interage com o coletor Kafka. Você pode usar o seguinte método:

Nome da coluna Obrigatório ou opcional Tipo
key opcional STRING ou BINARY
value obrigatório STRING ou BINARY
headers opcional ARRAY
topic opcional (ignorado se topic for definido como opção de gravador) STRING
partition opcional INT

Veja a seguir opções comuns definidas durante a gravação no Kafka:

Opção Valor Valor padrão Descrição
kafka.boostrap.servers Uma lista de <host:port> separada por vírgulas nenhum [Obrigatório] A configuração bootstrap.servers do Kafka.
topic STRING não definido [Opcional] Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico que existem nos dados.
includeHeaders BOOLEAN false [Opcional] Se os cabeçalhos do Kafka devem ser incluídos na linha.

Consulte Guia de integração Kafka de streaming estruturado para ver configurações opcionais.

Recuperar métricas do Kafka

Você pode obter a média, o mínimo e o máximo do número de deslocamentos que a consulta de fluxo está atrás do deslocamento mais recente disponível entre todos os tópicos inscritos com as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Confira Métricas de Leitura Interativa.

Observação

Disponível no Databricks Runtime 9.1 e superior.

Obtenha o número total estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos examinando o valor de estimatedTotalBytesBehindLatest. Essa estimativa se baseia nos lotes que foram processados nos últimos 300 segundos. O período de tempo em que a estimativa se baseia pode ser alterado definindo a opção bytesEstimateWindowLength como um valor diferente. Por exemplo, para configurá-lo para 10 minutos:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se você estiver executando o fluxo em um notebook, poderá ver essas métricas na guia Dados Brutos no painel de controle do progresso da consulta de fluxo:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Usar SSL para conectar o Azure Databricks ao Kafka

Para habilitar as conexões SSL com o Kafka, siga as instruções na documentação confluente Autenticação e Criptografia com o SSL. Você pode fornecer as configurações descritas, prefixadas com kafka., como opções. Por exemplo, especifique a localização do repositório confiável na propriedade kafka.ssl.truststore.location.

O Databricks recomenda que você:

O exemplo a seguir usa locais de armazenamento de objetos e segredos do Databricks para habilitar uma conexão SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Conexão Kafka no HDInsight para Azure Databricks

  1. Crie um cluster do Kafka do HDInsight.

    Consulte Conectar ao Kafka no HDInsight por meio de uma Rede Virtual do Azure para instruções.

  2. Configure os agentes do Kafka para anunciar o endereço correto.

    Siga as instruções em Configurar Kafka para publicidade de IP. Se você gerenciar o Kafka sozinho em máquinas virtuais do Azure, certifique-se de que a configuração advertised.listeners dos agentes esteja definida para o IP interno dos hosts.

  3. Criar um cluster do Azure Databricks.

  4. Emparelhar o cluster Kafka com o cluster Azure Databricks.

    Siga as instruções em Redes virtuais emparelhadas.

Autenticação da Entidade de Serviço com o Microsoft Entra ID e os Hubs de Eventos do Azure

O Azure Databricks dá suporte à autenticação de trabalhos do Spark com serviços de Hubs de Eventos. Essa autenticação é feita por meio do OAuth com o Microsoft Entra ID.

Diagrama de autenticação do AAD

O Azure Databricks dá suporte à autenticação do Microsoft Entra ID com uma ID do cliente e um segredo nos seguintes ambientes de computação:

  • Databricks Runtime 12.2 LTS e superior em computação configurada com o modo de acesso dedicado (anteriormente modo de acesso de usuário único).
  • Databricks Runtime 14.3 LTS e versões superiores em computação configurada com o modo de acesso padrão (anteriormente modo de acesso compartilhado).
  • Pipelines Declarativos do Lakeflow configurados sem o Catálogo do Unity.

O Azure Databricks não dá suporte à autenticação com o Microsoft Entra ID com um certificado em qualquer ambiente de computação, ou em pipelines declarativas do Lakeflow configurados com o Catálogo do Unity.

Essa autenticação não funciona em computação com modo de acesso padrão, nem em pipelines declarativos do Lakeflow do Catálogo do Unity.

Suporte para credenciais de serviço do Unity Catalog para AWS MSK e Azure Event Hubs

Desde o lançamento do DBR 16.1, o Azure Databricks dá suporte às credenciais de serviço do Catálogo do Unity para autenticar o acesso ao AWS Managed Streaming para MSK (Apache Kafka) e aos Hubs de Eventos do Azure. O Azure Databricks recomenda essa abordagem para executar o streaming do Kafka em clusters compartilhados e ao usar a computação sem servidor.

Para usar uma credencial de serviço do Catálogo do Unity para autenticação, execute as seguintes etapas:

  • Crie uma nova credencial de serviço do Unity Catalog. Se você não estiver familiarizado com esse processo, consulte Criar credenciais de serviço para obter instruções sobre como criar um.
  • Forneça o nome da credencial de serviço do Catálogo do Unity como uma opção de origem na configuração do Kafka. Defina a opção databricks.serviceCredential como o nome da credencial de serviço.

Observação: ao fornecer uma credencial de serviço do Catálogo do Unity ao Kafka, não especifique estas opções, pois elas não são mais necessárias:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Configurando o conector Kafka de Streaming estruturado

Para executar a autenticação com a ID do Microsoft Entra, você deve ter os seguintes valores:

  • Uma ID de locatário. Você pode encontrá-la na guia de serviço do Microsoft Entra ID.

  • Uma clientID (também conhecida como ID do aplicativo).

  • Um segredo do cliente. Quando tiver isso, você deverá adicioná-lo como um segredo ao seu Databricks Workspace. Para adicionar esse segredo, consulte Gerenciamento de segredo.

  • Um tópico do Hubs de Eventos. Você pode encontrar uma lista de tópicos na seção Hubs de Eventos na seção Entidades em uma página específica do Namespace do Hubs de Eventos. Para trabalhar com vários tópicos, você pode definir a função IAM no nível dos Hubs de Eventos.

  • Um servidor do Hubs de Eventos. Você pode encontrar isso na página de visão geral do seu namespace dos Hubs de Eventos específico:

    Namespace do Hubs de Eventos

Além disso, para usar o Entra ID, precisamos informar ao Kafka para usar o mecanismo SASL do OAuth (SASL é um protocolo genérico e o OAuth é um tipo de “mecanismo” SASL):

  • kafka.security.protocol deve ser SASL_SSL
  • kafka.sasl.mechanism deve ser OAUTHBEARER
  • kafka.sasl.login.callback.handler.class deve ser um nome totalmente qualificado da classe Java com um valor de kafkashaded para o manipulador de retorno de chamada de logon da nossa classe Kafka sombreada. Veja o exemplo a seguir para a classe exata.

Exemplo

Em seguida, vamos examinar um exemplo em execução:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala (linguagem de programação)

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Lidando com possíveis erros

  • Não há suporte para opções de streaming.

    Se você tentar usar esse mecanismo de autenticação no Lakeflow Declarative Pipelines configurado com o Catálogo do Unity, poderá receber o seguinte erro:

    Erro de streaming sem suporte

    Para resolver esse erro, use uma configuração de computação com suporte. Confira a seção Autenticação da entidade de serviço com o Microsoft Entra ID e os Hubs de Eventos do Azure.

  • Falha ao criar um novo KafkaAdminClient.

    Esse é um erro interno que o Kafka gera se qualquer uma das seguintes opções de autenticação estiver incorreta:

    • Uma clientID (também conhecida como ID do aplicativo)
    • ID do locatário
    • Um servidor do Hubs de Eventos

    Para resolver o erro, verifique se os valores estão corretos para essas opções.

    Além disso, você poderá ver esse erro se modificar as opções de configuração fornecidas por padrão no exemplo (que você foi solicitado a não modificar), como kafka.security.protocol.

  • Não há registros sendo retornados

    Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.

    Nenhuma mensagem de resultados

    Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Alguns motivos possíveis (embora não sejam exaustivos) são:

    • Você especificou o tópico incorreto do Hubs de Eventos.
    • A opção de configuração padrão do Kafka para startingOffsets é latest e você ainda não está recebendo dados por meio do tópico. Você pode definir startingOffsets para earliest a partir dos primeiros deslocamentos do Kafka para ler dados.