Compartilhar via


Otimização de desempenho para clusters do Apache Kafka HDInsight

Este artigo fornece algumas sugestões para otimizar o desempenho de suas cargas de trabalho do Apache Kafka no HDInsight. O foco é ajustar a configuração do produtor, do agente e do consumidor. Às vezes, talvez também seja necessário ajustar as configurações do sistema operacional para adaptar o desempenho à carga de trabalho pesada. Há diferentes maneiras de medir o desempenho, e as otimizações que você aplica dependem de suas necessidades comerciais.

Visão geral da arquitetura

Os tópicos do Kafka são usados para organizar registros. Os produtores produzem registros e os consumidores os consomem. Os produtores enviam registros para os agentes Kafka, que armazenam os dados. Cada nó de trabalho no cluster HDInsight é um agente do Kafka.

Registros de partição de tópicos entre agentes. Ao consumir registros, você pode usar um consumidor por partição a fim de alcançar o processamento paralelo dos dados.

A replicação é usada para duplicar partições entre nós. Essa partição protege contra interrupções de nó (agente). Uma única partição entre o grupo de réplicas é designada como líder de partição. O produtor tráfego é roteado para o preenchimento de cada nó, usando o estado gerenciado pelo ZooKeeper.

Identificar seu cenário

O desempenho do Apache Kafka tem dois aspectos principais – taxa de transferência e latência. A taxa de transferência máxima em que os dados podem ser processados. Uma taxa de transferência mais alta é melhor. Latência é o tempo que leva para que os dados sejam armazenados ou recuperados. Uma latência mais baixa é melhor. Encontrar o equilíbrio certo entre a taxa de transferência, a latência e o custo da infraestrutura do aplicativo pode ser desafiador. Seus requisitos de desempenho devem corresponder a uma das três situações comuns a seguir, com base em se você precisa de alta taxa de transferência, baixa latência ou ambas:

  • Alta taxa de transferência, baixa latência. Esse cenário requer alta taxa de transferência e baixa latência (~100 milissegundos). Um exemplo desse tipo de aplicativo é o monitoramento de disponibilidade do serviço.
  • Alta taxa de transferência, alta latência. Esse cenário requer alta taxa de transferência (~1,5 GBps), mas pode tolerar latência mais alta (< 250 ms). Um exemplo desse tipo de aplicativo é a ingestão de dados de telemetria para processos quase em tempo real, como aplicativos de detecção de intrusão e segurança.
  • Baixa taxa de transferência, baixa latência. Esse cenário requer baixa latência (< 10 ms) para processamento em tempo real, mas pode tolerar uma taxa de transferência menor. Um exemplo desse tipo de aplicativo são as verificações de ortografia e gramática online.

Configuração do produtor

As seções a seguir destacam algumas das propriedades de configuração genérica mais importantes para otimizar o desempenho dos produtores do Kafka. Para uma explicação detalhada de todas as propriedades de configuração, consulte Documentação do Apache Kafka sobre configurações de produtor.

Tamanho do lote

Os produtores do Apache Kafka reúnem grupos de mensagens (chamados de lotes) que são enviados como uma unidade a ser armazenada em uma única partição de armazenamento. Tamanho do lote significa o número de bytes que devem estar presentes antes que esse grupo seja transmitido. Aumentar o parâmetro batch.size pode aumentar a taxa de transferência, pois reduz a sobrecarga de processamento de solicitações de E/S e rede. Sob carga leve, o aumento do tamanho do lote pode aumentar a latência de envio do Kafka à medida que o produtor aguarda que um lote esteja pronto. Sob carga pesada, é recomendável aumentar o tamanho do lote para melhorar a taxa de transferência e a latência.

Confirmações necessárias do produtor

A configuração acks exigida pelo produtor determina o número de confirmações exigidas pelo líder de partição antes que uma solicitação de gravação seja considerada concluída. Essa configuração afeta a confiabilidade dos dados e aceita valores 0, 1 ou -1. O valor de -1 significa que uma confirmação deve ser recebida de todas as réplicas antes que a gravação seja concluída. A acks = -1 configuração fornece garantias mais fortes contra perda de dados, mas também resulta em maior latência e menor taxa de transferência. Se os requisitos do aplicativo exigirem maior taxa de transferência, tente configurar acks = 0 ou acks = 1. Tenha em mente que não confirmar todas as réplicas pode reduzir a confiabilidade dos dados.

Compactação

Um produtor do Kafka pode ser configurado para compactar mensagens antes de enviá-las aos agentes. A configuração compression.type especifica o codec de compactação a ser usado. Os codecs de compactação com suporte são "gzip", "snappy" e "lz4". A compactação é benéfica e deve ser considerada se houver uma limitação na capacidade do disco.

Entre os dois codecs de compactação comumente usados gzip e snappy, O gzip tem uma taxa maior de compactação, o que resulta em menor uso do disco no custo de maior carga da CPU. O codec snappy fornece menos compactação com menos sobrecarga de CPU. Você pode decidir qual codec usar com base nas limitações de disco do agente ou da CPU do produtor. O gzip pode compactar dados a uma taxa cinco vezes maior que o snappy.

A compactação de dados aumenta o número de registros que podem ser armazenados em um disco. Ele também pode aumentar a sobrecarga da CPU em casos de incompatibilidade entre os formatos de compactação usados pelo produtor e o agente. pois os dados devem ser compactados antes do envio e depois descompactados antes do processamento.

Configurações do agente

As seções a seguir destacam algumas das configurações mais importantes para otimizar o desempenho de seus agentes do Kafka. Para uma explicação detalhada de todas as configurações do agente, consulte Documentação do Apache Kafka sobre configurações de agente.

Número de discos

Os discos de armazenamento têm IOPS limitado (operações de entrada/saída por segundo) e bytes de leitura/gravação por segundo. Ao criar novas partições, o Kafka armazena cada nova partição no disco com menos partições existentes para balanceá-las nos discos disponíveis. Apesar da estratégia de armazenamento, ao processar centenas de réplicas de partição em cada disco, o Kafka pode, facilmente, saturar a taxa de transferência de disco disponível. A compensação nesse caso é entre a taxa de transferência e o custo. Se o aplicativo exigir maior taxa de transferência, crie um cluster com mais discos gerenciados por agente. Atualmente, o HDInsight não dá suporte à adição de discos gerenciados a um cluster em execução. Para obter mais informações sobre como configurar o número de discos gerenciados, consulte Configurar o armazenamento e a escalabilidade do Apache Kafka no HDInsight. Entenda as implicações de custo do aumento do espaço de armazenamento para os nós no cluster.

Número de tópicos e partições por tópico

Os produtores do Kafka gravam para tópicos. Os consumidores do Kafka leem a partir de tópicos. Um tópico é associado a um log, que é uma estrutura de dados em disco. O Kafka acrescenta registros de um produtor ao final de um log de tópico. Um log de tópico consiste em várias partições que são distribuídas por vários arquivos. Esses arquivos são, por sua vez, espalhados por vários nós de cluster Kafka. Os consumidores leem os tópicos do Kafka em sua cadência e podem escolher sua posição (deslocamento) no log do tópico.

Cada partição do Kafka é um arquivo de log no sistema e os threads do produtor podem gravar em vários logs simultaneamente. Da mesma forma, como cada thread de consumidor lê mensagens de uma partição, o consumo de várias partições também é tratado em paralelo.

Aumentar a densidade da partição (o número de partições por agente) adiciona uma sobrecarga relacionada às operações de metadados e a solicitação/resposta de partição entre o líder de partição e seus seguidores. Mesmo na ausência de fluxo de dados, as réplicas de partição ainda buscam dados de líderes, o que resulta em processamento extra para solicitações de envio e recebimento pela rede.

Para clusters do Apache Kafka 2.1 e 2.4 e, conforme observado anteriormente no HDInsight, recomendamos que você tenha no máximo 2.000 partições por agente, incluindo réplicas. O aumento do número de partições por agente diminui a taxa de transferência e também pode causar indisponibilidade de tópico. Para obter mais informações sobre o suporte de partição do Kafka, consulte a postagem oficial no blog do Apache Kafka sobre o aumento no número de partições com suporte na versão 1.1.0. Para obter detalhes sobre como modificar tópicos, consulte os Apache Kafka: modificar tópicos.

Número de réplicas

Um fator de replicação mais alto resulta em solicitações adicionais entre o líder de partição e os seguidores. Consequentemente, um fator de replicação maior consome mais disco e CPU para processar solicitações adicionais, aumentando a latência de gravação e diminuindo a taxa de transferência.

Recomendamos o uso da replicação pelo menos 3x para o Kafka no Azure HDInsight. A maioria das regiões do Azure tem três domínios de falha, mas em regiões com apenas dois domínios de falha, os usuários devem usar a replicação de 4x.

Para obter mais informações sobre replicação, consulte Apache Kafka: replicação e Apache Kafka: aumento do fator de replicação.

Configurações do consumidor

A seção a seguir destaca algumas configurações genéricas importantes para otimizar o desempenho dos consumidores do Kafka. Para obter uma explicação detalhada de todas as configurações, confira a documentação do Apache Kafka sobre configurações do consumidor.

Número de consumidores

O ideal é ter o número de partições igual ao número de consumidores. Se o número de consumidores for menor que o número de partições, alguns consumidores lerão de várias partições, aumentando a latência do consumidor.

Se o número de consumidores for maior que o número de partições, você estará desperdiçando os recursos do consumidor, pois esses consumidores estão ociosos.

Evitar o rebalanceamento frequente do consumidor

O rebalanceamento do consumidor é disparado pela alteração de propriedade da partição (ou seja, há uma escalada horizontal ou uma redução vertical dos consumidores), por uma falha do agente (já que os agentes são coordenadores de grupos de consumidores), por uma falha do consumidor, pela adição de um novo tópico ou pela adição de novas partições. Durante o rebalanceamento, os consumidores não podem consumir, aumentando a latência.

Os consumidores são considerados ativos quando podem enviar uma pulsação para um agente em session.timeout.ms. Caso contrário, o consumidor será considerado inativo ou com falha. Esse atraso leva a um reequilíbrio do consumidor. Quanto mais baixo o consumidor session.timeout.ms, mais rápido podemos detectar essas falhas.

Se session.timeout.ms for muito baixo, um consumidor poderá experimentar rebalanceamentos desnecessários repetidos causado por cenários como: quando um lote de mensagens leva mais tempo para ser processado ou quando uma pausa de GC de JVM leva muito tempo. Se você tiver um consumidor que gasta muito tempo processando mensagens, poderá resolver isso aumentando o limite superior de tempo que um consumidor pode ficar ocioso antes de efetuar fetch de mais registros com max.poll.interval.ms ou reduzindo o tamanho máximo de lotes retornados com o parâmetro de configuração max.poll.records.

Separação em lotes

Podemos adicionar envio em lote para consumidores, assim como fazemos com os produtores. A quantidade de dados que os consumidores podem obter em cada solicitação de fetch pode ser configurada alterando a configuração fetch.min.bytes. Esse parâmetro define os bytes mínimos esperados de uma resposta de fetch de um consumidor. Aumentar esse valor reduz o número de solicitações de busca feitas ao agente, reduzindo, portanto, a sobrecarga extra. Por padrão, esse valor é 1. Da mesma forma, há outra configuração fetch.max.wait.ms. Se uma solicitação de busca não tiver mensagens suficientes de acordo com o tamanho de fetch.min.bytes, ela aguardará até a expiração do tempo de espera com base nessa configuração fetch.max.wait.ms.

Observação

Em alguns cenários, os consumidores poderão parecer lentos quando não conseguirem processar a mensagem. Se você não estiver fazendo commit do deslocamento após uma exceção, o consumidor ficará preso em um loop infinito de um deslocamento específico e não avançará, aumentando o retardo no lado do consumidor como resultado.

Ajuste do sistema operacional Linux com carga de trabalho pesada

Mapas de memória

vm.max_map_count define o número máximo de mmap que um processo pode ter. Por padrão, na VM linux de cluster HDInsight Apache Kafka, o valor é 65535.

No Apache Kafka, cada segmento de log requer um par de arquivos index/timeindex e cada um desses arquivos consome um mmap. Em outras palavras, cada segmento de log usa dois mmap. Portanto, se cada partição hospedar um único segmento de log, ela exigirá, no mínimo, dois mmap. O número de segmentos de log por partição varia conforme o tamanho do segmento, a intensidade da carga, a política de retenção, o período sem interrupção e, geralmente, tende a ser maior que um. Mmap value = 2*((partition size)/(segment size))*(partitions)

Se o valor exigido por mmap exceder vm.max_map_count, o agente gerará a exceção "Map failed".

Para evitar essa exceção, use os comandos abaixo para verificar o tamanho do mmap na vm e aumentá-lo, se necessário, em cada nó de trabalho.

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

Observação

Procure não definir esse tamanho muito alto, pois ocupa memória na VM. A quantidade de memória que pode ser usada pela JVM nos mapas de memória é determinada pela configuração MaxDirectMemory. O valor padrão é 64 MB. É possível que ele seja alcançado. Você pode aumentar esse valor adicionando -XX:MaxDirectMemorySize=amount of memory used às configurações de JVM por meio do Ambari. Verifique a quantidade de memória usada no nó e se há RAM disponível para apoiá-la.

Próximas etapas