Orientações de otimização do desempenho para o Storm no HDInsight e Azure Data Lake Storage Gen1

Compreenda os fatores que devem ser considerados quando otimiza o desempenho de uma topologia do Azure Storm. Por exemplo, é importante compreender as características do trabalho realizado pelos spouts e pelos bolts (se o trabalho é de E/S ou de memória intensiva). Este artigo abrange uma série de diretrizes de otimização do desempenho, incluindo a resolução de problemas comuns.

Pré-requisitos

Otimizar o paralelismo da topologia

Poderá conseguir melhorar o desempenho ao aumentar a simultaneidade da E/S de e para Data Lake Storage Gen1. Uma topologia storm tem um conjunto de configurações que determinam o paralelismo:

  • Número de processos de trabalho (os trabalhadores são distribuídos uniformemente pelas VMs).
  • Número de instâncias do executor spout.
  • Número de instâncias do executor de bolt.
  • Número de tarefas de spout.
  • Número de tarefas bolt.

Por exemplo, num cluster com 4 VMs e 4 processos de trabalho, 32 executores de spout e 32 tarefas de spout e 256 executores de bolt e 512 tarefas de bolt, considere o seguinte:

Cada supervisor, que é um nó de trabalho, tem um processo de máquina virtual Java (JVM) de trabalho único. Este processo JVM gere 4 threads spout e 64 threads de bolt. Dentro de cada thread, as tarefas são executadas sequencialmente. Com a configuração anterior, cada thread spout tem uma tarefa e cada thread de bolt tem duas tarefas.

No Storm, eis os vários componentes envolvidos e como afetam o nível de paralelismo que tem:

  • O nó principal (denominado Nimbus no Storm) é utilizado para submeter e gerir tarefas. Estes nós não têm qualquer impacto no grau de paralelismo.
  • Os nós de supervisor. No HDInsight, isto corresponde a uma VM do Azure do nó de trabalho.
  • As tarefas de trabalho são processos storm em execução nas VMs. Cada tarefa de trabalho corresponde a uma instância JVM. O Storm distribui o número de processos de trabalho que especificar aos nós de trabalho da forma mais uniforme possível.
  • Instâncias do executor Spout e bolt. Cada instância do executor corresponde a um thread em execução nos trabalhos (JVMs).
  • Tarefas storm. Estas são tarefas lógicas que cada um destes threads executa. Isto não altera o nível de paralelismo, pelo que deve avaliar se precisa ou não de várias tarefas por executor.

Obter o melhor desempenho do Data Lake Storage Gen1

Ao trabalhar com Data Lake Storage Gen1, obtém o melhor desempenho se fizer o seguinte:

  • Anexe os seus pequenos acréscimos em tamanhos maiores (idealmente 4 MB).
  • Faça o máximo de pedidos simultâneos que puder. Uma vez que cada thread de bolt está a fazer leituras de bloqueio, quer ter algures no intervalo de 8 a 12 threads por núcleo. Isto mantém a NIC e a CPU bem utilizadas. Uma VM maior permite pedidos mais simultâneos.

Topologia de exemplo

Vamos supor que tem um cluster de oito nós de trabalho com uma VM do Azure D13v2. Esta VM tem oito núcleos, pelo que, entre os oito nós de trabalho, tem 64 núcleos totais.

Digamos que fazemos oito threads por núcleo. Tendo em conta 64 núcleos, isso significa que queremos 512 instâncias totais do executor de bolt (ou seja, threads). Neste caso, digamos que começamos com um JVM por VM e utilizamos principalmente a simultaneidade do thread no JVM para alcançar a simultaneidade. Isto significa que precisamos de oito tarefas de trabalho (uma por VM do Azure) e 512 executores de bolt. Tendo em conta esta configuração, o Storm tenta distribuir os trabalhadores uniformemente pelos nós de trabalho (também conhecidos como nós de supervisor), dando a cada nó de trabalho um JVM. Agora, dentro dos supervisores, o Storm tenta distribuir os executores uniformemente entre supervisores, dando a cada supervisor (ou seja, JVM) oito threads cada.

Otimizar parâmetros adicionais

Depois de ter a topologia básica, pode considerar se pretende ajustar qualquer um dos parâmetros:

  • Número de JVMs por nó de trabalho. Se tiver uma estrutura de dados grande (por exemplo, uma tabela de referência) que aloja na memória, cada JVM necessita de uma cópia separada. Em alternativa, pode utilizar a estrutura de dados em muitos threads se tiver menos JVMs. Para a E/S do bolt, o número de JVMs não faz tanta diferença como o número de threads adicionados nessas JVMs. Para simplificar, é uma boa ideia ter um JVM por trabalho. Consoante o que o bolt está a fazer ou o processamento da aplicação de que necessita, poderá ter de alterar este número.
  • Número de executores spout. Uma vez que o exemplo anterior utiliza bolts para escrever no Data Lake Storage Gen1, o número de spouts não é diretamente relevante para o desempenho do bolt. No entanto, dependendo da quantidade de processamento ou E/S que ocorre no spout, é boa ideia otimizar os spouts para obter o melhor desempenho. Certifique-se de que tem spouts suficientes para conseguir manter os parafusos ocupados. As taxas de saída dos spouts devem corresponder ao débito dos bolts. A configuração real depende do spout.
  • Número de tarefas. Cada bolt é executado como um único thread. As tarefas adicionais por bolt não fornecem nenhuma simultaneidade adicional. A única altura em que são benéficos é se o processo de reconhecimento da cadeia de identificação demorar uma grande parte do tempo de execução do bolt. É uma boa ideia agrupar muitas cadeias de identificação num acréscimo maior antes de enviar uma confirmação a partir do bolt. Assim, na maioria dos casos, várias tarefas não proporcionam qualquer benefício adicional.
  • Agrupamento local ou aleatório. Quando esta definição está ativada, as cadeias de identificação são enviadas para os parafusos no mesmo processo de trabalho. Isto reduz a comunicação entre processos e chamadas de rede. Isto é recomendado para a maioria das topologias.

Este cenário básico é um bom ponto de partida. Teste com os seus próprios dados para ajustar os parâmetros anteriores para obter um desempenho ideal.

Otimizar o spout

Pode modificar as seguintes definições para otimizar o spout.

  • Tempo limite da cadeia de identificação: topology.message.timeout.secs. Esta definição determina a quantidade de tempo que uma mensagem demora a concluir e recebe a confirmação antes de ser considerada uma falha.

  • Memória máxima por processo de trabalho: worker.childopts. Esta definição permite-lhe especificar parâmetros de linha de comandos adicionais para os trabalhadores java. A definição mais utilizada aqui é XmX, que determina a memória máxima alocada à área de trabalho de uma JVM.

  • Máximo de spout pendente: topology.max.spout.pending. Esta definição determina o número de cadeias de identificação que podem ser piloto (ainda não reconhecidas em todos os nós na topologia) por thread spout em qualquer altura.

    Um bom cálculo a fazer é estimar o tamanho de cada uma das suas cadeias de identificação. Em seguida, descubra a quantidade de memória que um thread spout tem. A memória total alocada a um thread, dividida por este valor, deve dar-lhe o limite superior para o parâmetro máximo de spout pendente.

Ajustar o parafuso

Quando estiver a escrever no Data Lake Storage Gen1, defina uma política de sincronização de tamanho (memória intermédia do lado do cliente) para 4 MB. Uma descarga ou hsync() só é efetuada quando o tamanho da memória intermédia está neste valor. O controlador Data Lake Storage Gen1 na VM de trabalho faz automaticamente esta memória intermédia, a menos que execute explicitamente um hsync().

O Data Lake Storage Gen1 storm bolt predefinido tem um parâmetro de política de sincronização de tamanho (fileBufferSize) que pode ser utilizado para otimizar este parâmetro.

Nas topologias de E/S intensivas, recomendamos que cada thread de bolt escreva no seu próprio ficheiro e defina uma política de rotação de ficheiros (fileRotationSize). Quando o ficheiro atinge um determinado tamanho, o fluxo é automaticamente descarregado e é escrito um novo ficheiro. O tamanho de ficheiro recomendado para rotação é de 1 GB.

Processar dados de cadeia de identificação

No Storm, um spout mantém-se numa cadeia de identificação até ser explicitamente reconhecido pelo parafuso. Se uma cadeia de identificação tiver sido lida pelo bolt mas ainda não tiver sido reconhecida, o spout poderá não ter persistido na Data Lake Storage Gen1 back-end. Depois de uma cadeia de identificação ser reconhecida, o spout pode ser garantido persistência pelo bolt e, em seguida, pode eliminar os dados de origem de qualquer origem a partir da qual esteja a ler.

Para obter o melhor desempenho em Data Lake Storage Gen1, tenha a memória intermédia de 4 MB de dados de cadeia de identificação. Em seguida, escreva no back-end Data Lake Storage Gen1 como uma escrita de 4 MB. Depois de os dados terem sido escritos com êxito no arquivo (ao chamar hflush()), o bolt pode reconhecer os dados de volta ao spout. Isto é o que o bolt de exemplo fornecido aqui faz. Também é aceitável manter um maior número de cadeias de identificação antes da chamada hflush() ser feita e as cadeias de identificação reconhecidas. No entanto, isto aumenta o número de cadeias de identificação no voo que o spout precisa de manter e, portanto, aumenta a quantidade de memória necessária por JVM.

Nota

As aplicações podem ter um requisito para reconhecer as cadeias de identificação com mais frequência (com tamanhos de dados inferiores a 4 MB) por outros motivos de não desempenho. No entanto, isso pode afetar o débito de E/S no back-end de armazenamento. Pondere cuidadosamente esta contrapartida em relação ao desempenho de E/S do bolt.

Se a taxa de entrada de cadeias de identificação não for elevada, por isso a memória intermédia de 4 MB demora muito tempo a preencher, considere mitigar esta situação ao:

  • Reduzir o número de parafusos, para que haja menos memórias intermédias a preencher.
  • Ter uma política baseada no tempo ou baseada em contagem, em que um hflush() é acionado a cada x flushes ou cada milissegundos de y, e as cadeias de identificação acumuladas até agora são reconhecidas novamente.

O débito neste caso é menor, mas com uma taxa lenta de eventos, o débito máximo não é o maior objetivo. Estas mitigações ajudam-no a reduzir o tempo total necessário para que uma cadeia de identificação flua para o arquivo. Isto pode importar se quiser um pipeline em tempo real mesmo com uma taxa de eventos baixa. Tenha também em atenção que, se a sua taxa de cadeia de identificação de entrada for baixa, deve ajustar o parâmetro topology.message.timeout_secs, para que as cadeias de identificação não excedam o tempo limite enquanto estão a ser processadas ou em memória intermédia.

Monitorizar a topologia no Storm

Enquanto a topologia estiver em execução, pode monitorizá-la na interface de utilizador do Storm. Eis os principais parâmetros a analisar:

  • Latência total da execução do processo. Este é o tempo médio que uma cadeia de identificação demora a ser emitida pelo spout, processada pelo bolt e reconhecida.

  • Latência total do processo de bolt. Este é o tempo médio gasto pela cadeia de identificação no bolt até receber uma confirmação.

  • Latência total da execução do bolt. Este é o tempo médio gasto pelo bolt no método de execução.

  • Número de falhas. Isto refere-se ao número de cadeias de identificação que não foram totalmente processadas antes de excederem o tempo limite.

  • Capacidade. Esta é uma medida do quão ocupado está o seu sistema. Se este número for 1, os seus parafusos estão a funcionar o mais rápido possível. Se for inferior a 1, aumente o paralelismo. Se for maior que 1, reduza o paralelismo.

Resolver problemas comuns

Eis alguns cenários comuns de resolução de problemas.

  • Muitas cadeias de identificação estão a exceder o tempo limite. Observe cada nó na topologia para determinar onde está o estrangulamento. A razão mais comum para tal é que os bolts não conseguem acompanhar os spouts. Isto leva a que as cadeias de identificação entusiasmem as memórias intermédias internas enquanto aguardam para serem processadas. Considere aumentar o valor de tempo limite ou diminuir o spout máximo pendente.

  • Existe uma latência de execução total elevada do processo, mas uma latência de processo de bolt baixa. Neste caso, é possível que as cadeias de identificação não estejam a ser reconhecidas suficientemente depressa. Verifique se existe um número suficiente de reconhecedores. Outra possibilidade é que estejam à espera na fila demasiado tempo antes de os bolts começarem a processá-los. Diminua o spout máximo pendente.

  • Existe uma latência de execução de parafusos elevados. Isto significa que o método execute() do seu bolt está a demorar demasiado tempo. Otimize o código ou observe os tamanhos de escrita e o comportamento de descarga.

limitação de Data Lake Storage Gen1

Se atingir os limites de largura de banda fornecidos pelo Data Lake Storage Gen1, poderá ver falhas nas tarefas. Verifique se existem erros de limitação nos registos de tarefas. Pode diminuir o paralelismo ao aumentar o tamanho do contentor.

Para verificar se está a ser limitado, ative o registo de depuração no lado do cliente:

  1. No Ambari>Storm>Config>Advanced storm-worker-log4j, altere <root level="info"> para <root level="debug">. Reinicie todos os nós/serviço para que a configuração entre em vigor.
  2. Monitorize os registos de topologia storm em nós de trabalho (em /var/log/storm/worker-artifacts/<TopologyName>/<port>/worker.log) para Data Lake Storage Gen1 exceções de limitação.

Passos seguintes

Neste blogue, pode referenciar uma otimização de desempenho adicional para o Storm.

Para obter um exemplo adicional para executar, veja este no GitHub.