Padrão de Pipes e Filtros

Armazenamento do Blobs do Azure
Funções do Azure
Armazenamento de Filas do Azure

Decompor uma tarefa que executa processamento complexo em uma série de elementos separados que podem ser reutilizados. Isso pode melhorar o desempenho, escalabilidade e reutilização, permitindo que os elementos de tarefa que executam o processamento sejam implantados e escalados de forma independente.

Contexto e problema

Você tem um pipeline de tarefas sequenciais que precisa processar. Uma abordagem simples, mas inflexível, para implementar esse aplicativo é realizar esse processamento em um módulo monolítico. No entanto, essa abordagem provavelmente reduzirá as oportunidades para refatorar o código, otimizá-lo ou reutilizá-lo se partes do mesmo processamento forem necessárias em outros lugares no aplicativo.

O diagrama a seguir ilustra um dos problemas com o processamento de dados usando uma abordagem monolítica, a incapacidade de reutilizar códigos em vários pipelines. Neste exemplo, um aplicativo recebe e processa dados de duas fontes. Um módulo separado processa os dados de cada fonte realizando uma série de tarefas para transformar os dados antes de passar o resultado para a lógica de negócios do aplicativo.

Diagrama que mostra uma solução implementada com módulos monolíticos.

Algumas das tarefas que os módulos monolíticos realizam são funcionalmente semelhantes, mas o código tem que ser repetido em ambos os módulos e provavelmente está rigorosamente integrado ao seu módulo. Além da incapacidade de reutilizar a lógica, essa abordagem introduz um risco quando os requisitos mudam. Lembre-se de atualizar o código em ambos os locais.

Há outros desafios com uma implementação monolítica não relacionada a vários pipelines ou à reutilização. Com um monólito, você não pode realizar tarefas específicas em diferentes ambientes nem dimensioná-las de forma independente. Algumas tarefas podem exigir muita computação e se beneficiariam da execução em hardware potente ou da execução de várias instâncias em paralelo. Outras tarefas podem não ter os mesmos requisitos. Além disso, com os monólitos, é desafiador reordenar tarefas ou injetar novas tarefas no pipeline. Essas alterações exigem novos testes de todo o pipeline.

Solução

Dividir o processamento necessário para cada stream em um conjunto de componentes separados (ou filtros), cada um executando uma única tarefa. Os filtros são compostos em pipelines, conectando os filtros com pipes. Os filtros recebem mensagens de um pipe de entrada e publicam mensagens em outro pipe de saída. Os pipes não executam roteamento nem qualquer outra lógica. Eles apenas conectam filtros, passando a mensagem de saída de um filtro como a entrada para o próximo.

Os filtros agem de forma independente e desconhecem outros filtros. Eles só conhecem seus esquemas de entrada e saída. Como tal, os filtros podem ser organizados em qualquer ordem, desde que o esquema de entrada de qualquer filtro corresponda ao esquema de saída do filtro anterior. O uso de um esquema padronizado para todos os filtros aumenta a capacidade de reordenar filtros.

O acoplamento solto de filtros facilita:

  • Criar novos pipelines compostos por filtros existentes
  • Atualizar ou substituir a lógica em filtros individuais
  • Reordenar filtros, quando necessário
  • Executar filtros em hardwares diferentes, quando necessário
  • Executar filtros em paralelo

Este diagrama mostra uma solução implementada com pipes e filtros:

O diagrama que mostra uma solução implementada com pipes e filtros.

O tempo necessário para processar uma solicitação única depende da velocidade dos filtros lentos no pipeline. Um ou mais filtros podem ser gargalos, especialmente se um número alto de solicitações aparecer em um stream de uma determinada fonte de dados. A capacidade de executar instâncias paralelas de filtros lentos permite que o sistema espalhe a carga e melhore a taxa de transferência.

A capacidade de executar filtros em diferentes instâncias de computação permite que eles sejam escalados de forma independente e aproveitem a elasticidade que muitos ambientes de nuvem fornecem. Um filtro que é computacionalmente intensivo pode ser executado em hardware de alto desempenho, enquanto outros filtros menos exigentes podem ser hospedados em hardware de mercadoria menos caro. Como os filtros não precisam estar no mesmo datacenter ou localização geográfica, habilitar cada elemento em um pipeline para ser executado em um ambiente próximo dos recursos necessários. Este diagrama mostra um exemplo aplicado ao pipeline para os dados da Fonte 1:

Diagrama que mostra um exemplo aplicado ao pipeline para os dados da Fonte 1.

Se a entrada e a saída de um filtro estiverem estruturadas como um stream, será possível executar o processamento para cada filtro em paralelo. O primeiro filtro no pipeline pode iniciar seu trabalho e emitir seus resultados, que são passados diretamente para o próximo filtro na sequência antes do primeiro filtro ter concluído seu trabalho.

O uso do padrão de Pipes e Filtros em conjunto com o padrão de Transação de Compensação é uma abordagem alternativa para implementar transações distribuídas. Você pode dividir uma transação distribuída em tarefas compensáveis e separáveis, cada uma das quais pode ser implementada por meio de um filtro que também implementa o padrão de Transação de Compensação. Você pode implementar os filtros em um pipeline como tarefas hospedadas separadas que são executadas próximas aos dados que eles mantêm.

Problemas e considerações

Considere os seguintes pontos ao decidir como implementar esse padrão:

  • Complexidade. A flexibilidade aumentada que esse padrão fornece também pode introduzir complexidade, especialmente se os filtros em um pipeline estiverem distribuídos em diferentes servidores.

  • Confiabilidade. Use uma infraestrutura que garanta que os dados que fluem entre filtros em um pipe não sejam perdidos.

  • Idempotência . Se um filtro em um pipeline falhar após receber uma mensagem e o trabalho for reagendado para outra instância do filtro, parte do trabalho talvez já tenha sido concluída. Se o trabalho atualizar algum aspecto do estado global (como informações armazenadas em um banco de dados), uma única atualização pode ser repetida. Um problema semelhante pode ocorrer quando um filtro falha após a postagem dos resultados para o próximo filtro, mas antes de indicar que seu trabalho foi concluído com êxito. Nesses casos, outra instância do filtro pode repetir esse trabalho, fazendo com que os mesmos resultados sejam postados duas vezes. Esse cenário pode resultar em filtros subsequentes no pipeline que processam os mesmos dados duas vezes. Portanto, os filtros em um pipeline devem ser projetados para serem idempotentes. Para mais informações, confira Padrões de Idempotência no blog de Jonathan Oliver.

  • Mensagens repetidas. Se um filtro em um pipeline falhar após a postagem de uma mensagem para o próximo estágio do pipeline, outra instância do filtro poderá ser executada, e ela postará uma cópia da mesma mensagem para o pipeline. Esse cenário pode fazer com que duas instâncias da mesma mensagem sejam passadas para o próximo filtro. Para evitar esse problema, o pipeline deverá detectar e eliminar mensagens duplicadas.

    Observação

    Se você estiver implementando o pipeline utilizando filas de mensagens (como as filas do Barramento de Serviço do Azure), a infraestrutura de enfileiramento de mensagens poderá fornecer detecção e remoção automática de mensagens duplicadas.

  • Contexto e estado. Em um pipeline, cada filtro essencialmente é executado em isolamento e não deve fazer qualquer suposição sobre como foi invocado. Portanto, cada filtro deve ter um contexto suficiente para realizar seu trabalho. Esse contexto pode incluir uma quantidade significativa de informações de estado. Se os filtros usarem estado externo, como dados em um banco de dados ou armazenamento externo, você deverá considerar o impacto no desempenho. Cada filtro precisa carregar, operar e manter esse estado, o que aumenta a sobrecarga nas soluções que carregam o estado externo uma única vez.

  • Tolerância de mensagens. Os filtros devem ser tolerantes aos dados na mensagem de entrada contra a quais não operam. Eles operam nos dados pertinentes a eles, ignoram outros dados e os transmitem inalterados na mensagem de saída.

  • Tratamento de erros: cada filtro deve determinar o que fazer em caso de erro não contínuo. O filtro deve determinar se falha no pipeline ou propaga a exceção.

Quando usar esse padrão

Use esse padrão quando:

  • O processamento exigido por um aplicativo pode ser facilmente dividido em um conjunto de etapas independentes.

  • As etapas de processamento executadas por um aplicativo têm requisitos de escalabilidade diferentes.

    Observação

    É possível agrupar filtros que devem escalar no mesmo processo. Para obter mais informações, consulte o padrão de Consolidação de Recursos de Computação.

  • Você precisa de flexibilidade para permitir a reordenação das etapas de processamento seguidas pelo aplicativo ou para permitir a capacidade de adicionar e remover etapas.

  • O sistema pode se beneficiar de distribuir o processamento para etapas em diferentes servidores.

  • É necessária uma solução confiável que minimize os efeitos da falha em uma etapa, enquanto os dados estão sendo processados.

Esse padrão pode não ser útil quando:

  • O aplicativo segue um padrão de solicitação-resposta.

  • O processamento da tarefa deve ser concluído como parte de uma solicitação inicial, como um cenário de solicitação/resposta.

  • As etapas de processamento executadas por um aplicativo não são independentes ou devem ser realizadas juntas como parte de uma única transação.

  • A quantidade de contexto ou informações de estado exigidos por uma etapa torna essa abordagem ineficiente. Você poderá persistir informações de estado em um banco de dados, mas não utilize essa estratégia se a carga extra no banco de dados causar contenção excessiva.

Design de carga de trabalho

Um arquiteto deve avaliar como o padrão de Pipes e filtros pode ser usado na criação de suas cargas de trabalho para abordar os objetivos e princípios contidos nos pilares da estrutura bem arquitetada do Azure. Por exemplo:

Pilar Como esse padrão apoia os objetivos do pilar
As decisões de design de confiabilidade ajudam sua carga de trabalho a se tornar resiliente ao mau funcionamento e a garantir que ela se recupere para um estado totalmente funcional após a ocorrência de uma falha. A responsabilidade única de cada estágio permite uma atenção focada e evita a distração do processamento de dados misturados.

- RE:01 Simplicidade
- RE:07 Trabalhos em segundo plano

Tal como acontece com qualquer decisão de design, considere quaisquer compensações em relação aos objetivos dos outros pilares que possam ser introduzidos com este padrão.

Exemplo

Você pode utilizar uma sequência de filas de mensagens para fornecer a infraestrutura necessária para implementar um pipeline. Uma fila de mensagens inicial recebe mensagens não processadas que se tornam o início da implementação do padrão de pipes e filtros. Um componente implementado como uma tarefa de filtro escuta uma mensagem nessa fila, executa seu trabalho e, em seguida, posta uma mensagem nova ou transformada para a próxima fila na sequência. Outra tarefa de filtro pode escutar mensagens nessa fila, processá-las, postar os resultados em outra fila e assim por diante, até a etapa final que encerra o processo de pipes e filtros. Este diagrama ilustra um pipeline que usa filas de mensagens:

Diagrama mostrando um pipeline que usa filas de mensagens.

Um pipeline de processamento de imagem poderia ser implementado usando esse padrão. Se sua carga de trabalho capturar uma imagem, ela poderá passar por uma série de filtros amplamente independentes e reordenáveis para executar ações como:

  • Moderação de conteúdo
  • redimensionando
  • marcas-d'água
  • reorientação
  • Remoção de metadados Exif
  • Publicação da rede de distribuição de conteúdo (CDN)

Nesse exemplo, os filtros podem ser implementados como funções do Azure implantados individualmente ou até mesmo como um único aplicativo de funções do Azure que contém cada filtro como uma implantação isolada. A utilização de gatilhos, associações de entrada e associações de saída de funções do Azure pode simplificar o código do filtro e funcionar automaticamente com um tubo baseado em fila utilizando uma verificação de declaração para a imagem a ser processada.

Diagrama mostrando um pipeline de processamento de imagem que usa o Armazenamento de Filas do Azure entre uma série de Funções do Azure.

Veja um exemplo da aparência de um filtro, implementado como uma função do Azure, acionado a partir de um pipe de armazenamento de fila com uma verificação de declaração na imagem e a gravação de uma nova verificação de declaração em outro pipe de armazenamento de fila. Substituímos a implementação por pseudocódigo nos comentários por questões de brevidade. Mais códigos como esse podem ser encontrados na demonstração do padrão de Pipes e Filtros disponível no GitHub.

// This is the "Resize" filter. It handles claim checks from input pipe, performs the
// resize work, and places a claim check in the next pipe for anther filter to handle.
[Function(nameof(ResizeFilter))]
[QueueOutput("pipe-fjur", Connection = "pipe")]  // Destination pipe claim check
public async Task<string> RunAsync(
  [QueueTrigger("pipe-xfty", Connection = "pipe")] string imageFilePath,  // Source pipe claim check
  [BlobInput("{QueueTrigger}", Connection = "pipe")] BlockBlobClient imageBlob)  // Image to process
{
  _logger.LogInformation("Processing image {uri} for resizing.", imageBlob.Uri);

  // Idempotency checks
  // ...

  // Download image based on claim check in queue message body
  // ...
  
  // Resize the image
  // ...

  // Write resized image back to storage
  // ...

  // Create claim check for image and place in the next pipe
  // ...
  
  _logger.LogInformation("Image resizing done or not needed. Adding image {filePath} into the next pipe.", imageFilePath);
  return imageFilePath;
}

Observação

A Estrutura de Integração do Spring possui uma implementação do padrão de pipes e filtros.

Próximas etapas

Você pode achar os seguintes recursos úteis ao implementar esse padrão:

Os seguintes padrões também serão relevantes durante a implementação desse padrão:

  • Padrão de Verificação de Declarações. Um pipeline implementado usando uma fila talvez não contenha o item real que está sendo enviado através dos filtros, mas sim um ponteiro para os dados que precisam ser processados. O exemplo utiliza uma verificação de declaração no Armazenamento de Filas do Azure para imagens armazenadas no Armazenamento do Blobs do Azure.
  • Padrão de consumidores concorrentes. Um pipeline pode conter várias instâncias de um ou mais filtros. Essa abordagem é útil para executar instâncias paralelas de filtros lentos. Ela permite que o sistema distribua a carga e melhore o rendimento. Cada instância de um filtro competirá por entrada com as outras instâncias, mas duas instâncias de um filtro não poderão processar os mesmos dados. Este artigo explica a abordagem.
  • Padrão de consolidação de recursos de computação. Pode ser possível agrupar filtros que devem ser escalados em conjunto em um único processo. Este artigo fornece mais informações sobre os benefícios e compensações dessa estratégia.
  • Padrão de Transação de Compensação. Você pode implementar um filtro como uma operação que pode ser revertida ou que possui uma operação de compensação que restaura o estado para uma versão anterior se houver uma falha. Este artigo explica como você pode implementar esse padrão para manter ou alcançar consistência eventual.
  • Pipes e filtros: padrões de Enterprise Integration.