Processamento de fluxos com o Azure Stream Analytics

Cosmos DB
Hubs de Eventos
Monitor
Stream Analytics

Esta arquitetura de referência mostra um gasoduto de processamento de fluxo de ponta a ponta. O gasoduto ingere dados de duas fontes, correlaciona os registos nos dois fluxos, e calcula uma média de rolamento através de uma janela de tempo. Os resultados são armazenados para análise posterior.

Logotipo do GitHub Uma implementação de referência para esta arquitetura está disponível no GitHub.

Arquitetura

Diagrama mostrando arquitetura de referência para criar um pipeline de processamento de fluxo com Azure Stream Analytics.

Transfira um ficheiro do Visio desta arquitetura.

Fluxo de trabalho

A arquitetura é composta pelos seguintes componentes:

Origens de dados. Nesta arquitetura, existem duas fontes de dados que geram fluxos de dados em tempo real. O primeiro fluxo contém informações sobre o passeio, e o segundo contém informações sobre tarifas. A arquitetura de referência inclui um gerador de dados simulado que lê a partir de um conjunto de ficheiros estáticos e empurra os dados para os Centros de Eventos. Numa aplicação real, as fontes de dados seriam dispositivos instalados nos táxis.

Hubs de Eventos do Azure. O Event Hubs é um serviço de ingestão de eventos. Esta arquitetura usa dois casos de centro de eventos, um para cada fonte de dados. Cada fonte de dados envia um fluxo de dados para o centro de eventos associado.

Azure Stream Analytics. Stream Analytics é um motor de processamento de eventos. Um trabalho do Stream Analytics lê os fluxos de dados dos dois centros de eventos e executa o processamento de fluxo.

Azure Cosmos DB. A saída do trabalho stream Analytics é uma série de registos, que são escritos como documentos JSON para uma base de dados de documentos Azure Cosmos DB.

Microsoft Power BI. Power BI é um conjunto de ferramentas de análise de negócios para analisar dados para insights de negócios. Nesta arquitetura, carrega os dados da Azure Cosmos DB. Isto permite que os utilizadores analisem o conjunto completo de dados históricos que foram recolhidos. Também pode transmitir os resultados diretamente do Stream Analytics para Power BI para uma visão em tempo real dos dados. Para obter mais informações, veja Transmissão em tempo real no Power BI.

Azure Monitor. O Azure Monitor recolhe métricas de desempenho sobre os serviços Azure implantados na solução. Ao visualizar estes num dashboard, você pode obter insights sobre a saúde da solução.

Detalhes do cenário

Cenário: Uma empresa de táxis recolhe dados sobre cada viagem de táxi. Para este cenário, assumimos que existem dois dispositivos separados que enviam dados. O táxi tem um medidor que envia informações sobre cada passeio - a duração, distância e locais de recolha e entrega. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre tarifas. A empresa de táxis quer calcular a gorjeta média por milha percorrido, em tempo real, de modo a detetar tendências.

Potenciais casos de utilização

Esta solução está otimizada para o cenário de retalho.

Ingestão de dados

Para simular uma fonte de dados, esta arquitetura de referência utiliza o conjunto de dados de táxi da cidade de Nova Iorque[1]. Este conjunto de dados contém dados sobre viagens de táxi em Nova Iorque durante um período de quatro anos (2010-2013). Contém dois tipos de registo: dados de passeio e dados de tarifas. Os dados do passeio incluem a duração da viagem, a distância da viagem e o local de recolha e entrega. Os dados das tarifas incluem tarifas, impostos e valores de gorjeta. Os campos comuns em ambos os tipos de registo incluem o número de medalhão, a licença de hack e a identificação do fornecedor. Juntos, estes três campos identificam exclusivamente um táxi mais um motorista. Os dados são armazenados em formato CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidade de Illinois em Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

O gerador de dados é uma aplicação .NET Core que lê os registos e os envia para Hubs de Eventos do Azure. O gerador envia dados de viagem no formato JSON e dados de tarifas em formato CSV.

O Event Hubs utiliza divisórias para segmentar os dados. As divisórias permitem ao consumidor ler cada divisória em paralelo. Quando envia dados para Os Centros de Eventos, pode especificar explicitamente a chave de partição. Caso contrário, os registos são atribuídos a divisórias em forma de rodapé.

Neste cenário específico, os dados do passeio e os dados das tarifas devem acabar com a mesma identificação de partição para um determinado táxi. Isto permite que o Stream Analytics aplique um certo paralelismo quando correlaciona os dois fluxos. Um recorde na partição n dos dados do passeio corresponderá a um recorde na partição n dos dados da tarifa.

Diagrama de processamento de fluxo com Azure Stream Analytics e Centros de Eventos

No gerador de dados, o modelo comum de dados para ambos os tipos de registos tem uma PartitionKey propriedade que é a concatenação de Medallion, HackLicensee VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Esta propriedade é usada para fornecer uma chave de partição explícita ao enviar para Os Centros de Eventos:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Processamento de fluxos

O trabalho de processamento de fluxo é definido usando uma consulta SQL com vários passos distintos. Os dois primeiros passos simplesmente selecionam registos dos dois fluxos de entrada.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

O passo seguinte junta-se aos dois fluxos de entrada para selecionar registos correspondentes de cada fluxo.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Esta consulta junta-se a registos num conjunto de campos que identificam exclusivamente registos correspondentes (PartitionId e PickupTime).

Nota

Queremos que os TaxiRide e TaxiFare riachos se juntem à combinação única deMedallion, HackLicenseVendorId e PickupTime. Neste caso, HackLicense as PartitionId coberturas e VendorId os Medallioncampos, mas isso não deve ser tomado como o caso em geral.

No Stream Analytics, as juntas são temporais, o que significa que os registos são unidos dentro de uma determinada janela de tempo. Caso contrário, o trabalho pode ter de esperar indefinidamente por uma correspondência. A função DATEDIFF especifica até que ponto dois registos correspondentes podem ser separados a tempo de uma partida.

O último passo no trabalho calcula a inclinação média por milha, agrupada por uma janela de 5 minutos.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics fornece várias funções de janela. Uma janela de salto avança no tempo por um período fixo, neste caso 1 minuto por salto. O resultado é calcular uma média móvel nos últimos 5 minutos.

Na arquitetura aqui mostrada, apenas os resultados do trabalho stream Analytics são guardados para Azure Cosmos DB. Para um grande cenário de dados, considere também usar o Event Hubs Capture para guardar os dados brutos do evento no armazenamento de Azure Blob. A manutenção dos dados brutos permitir-lhe-á executar consultas sobre os seus dados históricos mais tarde, de forma a obter novos conhecimentos a partir dos dados.

Considerações

Estas considerações implementam os pilares do Quadro Azure Well-Architected, que é um conjunto de princípios orientadores que podem ser utilizados para melhorar a qualidade de uma carga de trabalho. Para mais informações, consulte o Microsoft Azure Well-Architected Framework.

Escalabilidade

Hubs de Eventos

A capacidade de produção dos Centros de Eventos é medida em unidades de produção. Pode autoescalar um hub de eventos ativando a auto-insuflada, que escala automaticamente as unidades de produção com base no tráfego, até um máximo configurado.

Stream Analytics

Para o Stream Analytics, os recursos de computação atribuídos a um trabalho são medidos em Unidades de Streaming. Os trabalhos stream Analytics são melhores se o trabalho puder ser paralelo. Dessa forma, o Stream Analytics pode distribuir o trabalho por vários nós de computação.

Para a entrada de Centros de Eventos, utilize a PARTITION BY palavra-chave para dividir o trabalho Stream Analytics. Os dados serão divididos em subconjuntos com base nas divisórias Do Event Hubs.

As funções de janela e as juntas temporais requerem SU adicional. Quando possível, utilize PARTITION BY de modo a que cada divisória seja processada separadamente. Para obter mais informações, consulte Compreender e ajustar Unidades de Streaming.

Se não for possível paralelizar todo o trabalho stream Analytics, tente dividir o trabalho em vários passos, começando com um ou mais passos paralelos. Assim, os primeiros passos podem correr em paralelo. Por exemplo, nesta arquitetura de referência:

  • Os passos 1 e 2 são declarações simples SELECT que selecionam registos dentro de uma única partição.
  • O passo 3 executa uma junção dividida através de dois fluxos de entrada. Este passo tira partido do facto de os registos correspondentes partilharem a mesma chave de partição, pelo que é garantido ter o mesmo ID de partição em cada fluxo de entrada.
  • Passo 4 agrega-se em todas as divisórias. Este passo não pode ser paralelo.

Utilize o diagrama de trabalho stream Analytics para ver quantas divisórias são atribuídas a cada passo no trabalho. O diagrama a seguir mostra o diagrama de trabalho desta arquitetura de referência:

Diagrama mostrando trabalhos de Stream Analytics.

Azure Cosmos DB

A capacidade de produção para Azure Cosmos DB é medida em Unidades de Pedido (RU). Para escalar um recipiente DB Azure Cosmos passado 10.000 RU, deve especificar uma chave de partição quando criar o recipiente e incluir a chave de partição em cada documento.

Nesta arquitetura de referência, novos documentos são criados apenas uma vez por minuto (o intervalo da janela de salto), pelo que os requisitos de produção são bastante baixos. Por isso, não há necessidade de atribuir uma chave de partição neste cenário.

Monitorização

Com qualquer solução de processamento de fluxo, é importante monitorizar o desempenho e a saúde do sistema. O Azure Monitor recolhe métricas e registos de diagnóstico para os serviços Azure utilizados na arquitetura. O Azure Monitor está integrado na plataforma Azure e não requer nenhum código adicional na sua aplicação.

Qualquer um dos seguintes sinais de aviso indica que deve reduzir o recurso Azure relevante:

  • O Event Hubs acelera os pedidos ou está próximo da quota de mensagem diária.
  • O trabalho stream Analytics utiliza consistentemente mais de 80% das Unidades de Streaming (SU) atribuídas.
  • Azure Cosmos DB começa a acelerar os pedidos.

A arquitetura de referência inclui um dashboard personalizado, que é implantado para o portal do Azure. Depois de implementar a arquitetura, pode ver o painel de instrumentos abrindo o portal do Azure e selecionando a TaxiRidesDashboard partir da lista de dashboards. Para obter mais informações sobre a criação e implementação de dashboards personalizados no portal do Azure, consulte Programmaticamente criar Dashboards Azure.

A imagem que se segue mostra o painel de instrumentos depois do trabalho do Stream Analytics ter funcionado durante cerca de uma hora.

Screenshot do painel de táxi rides

O painel na parte inferior esquerda mostra que o consumo de SU para o trabalho stream Analytics sobe durante os primeiros 15 minutos e, em seguida, níveis fora. Este é um padrão típico à medida que o trabalho atinge um estado estável.

Note que os Centros de Eventos estão a estrangular os pedidos, mostrados no painel superior direito. Um pedido ocasional de aceleração não é um problema, porque o cliente do Event Hubs SDK automaticamente recauchutado quando recebe um erro de estrangulamento. No entanto, se vir erros de estrangulamento consistentes, significa que o centro de eventos precisa de mais unidades de produção. O gráfico seguinte mostra um teste executado utilizando a função de insuflação automática do Event Hubs, que escala automaticamente as unidades de produção conforme necessário.

Screenshot do Evento Hubs autoscaling.

A auto-insuflado foi ativada por volta das 06:35. Pode ver a queda p em pedidos acelerados, uma vez que os Event Hubs aumentaram automaticamente até 3 unidades de produção.

Curiosamente, isto teve o efeito colateral de aumentar a utilização de SU no trabalho stream Analytics. Ao acelerar, o Event Hubs estava a reduzir artificialmente a taxa de ingestão para o trabalho de Stream Analytics. É comum que a resolução de um estrangulamento de desempenho revele outra. Neste caso, a atribuição de SU adicional para o trabalho stream Analytics resolveu a questão.

Otimização de custos

A otimização de custos tem a ver com formas de reduzir despesas desnecessárias e melhorar a eficiência operacional. Para mais informações, consulte a visão geral do pilar de otimização de custos.

Utilize a calculadora de preços do Azure para prever os custos. Aqui estão algumas considerações para os serviços utilizados nesta arquitetura de referência.

Azure Stream Analytics

O Azure Stream Analytics tem o preço do número de unidades de streaming ($0,11/hora) necessárias para processar os dados no serviço.

O Stream Analytics pode ser caro se não estiver a processar os dados em tempo real ou em pequenas quantidades de dados. Para esses casos de utilização, considere usar Funções do Azure ou Aplicações Lógicas para mover dados de Hubs de Eventos do Azure para uma loja de dados.

DB Hubs de Eventos do Azure e Azure Cosmos

Para considerações de custos sobre Hubs de Eventos do Azure e Azure Cosmos DB, consulte considerações de custo ver o processamento do Stream com a arquitetura de referência Azure Databricks.

DevOps

  • Criar grupos de recursos separados para ambientes de produção, desenvolvimento e teste. A utilização de grupos de recursos separados torna mais fácil gerir as implementações, eliminar as implementações de teste e atribuir direitos de acesso.

  • Utilize o modelo Resource Manager Azure para implantar os recursos Azure seguindo a infraestrutura como Processo código (IaC). Com modelos, automatizar implementações usando serviços Azure DevOps ou outras soluções ci/CD é mais fácil.

  • Coloque cada carga de trabalho num modelo de implantação separado e guarde os recursos em sistemas de controlo de fontes. Pode implementar os modelos em conjunto ou individualmente como parte de um processo de CI/CD, facilitando o processo de automatização.

    Nesta arquitetura, Hubs de Eventos do Azure, Log Analytics e Azure Cosmos DB são identificados como uma única carga de trabalho. Estes recursos estão incluídos num único modelo ARM.

  • Considere encenar as suas cargas de trabalho. Desloque-se para várias fases e efetue verificações de validação em cada fase antes de passar para a fase seguinte. Desta forma, pode impulsionar as atualizações para os seus ambientes de produção de forma altamente controlada e minimizar problemas de implantação imprevistos.

  • Considere utilizar o Azure Monitor para analisar o desempenho do seu pipeline de processamento de fluxo. Para obter mais informações, consulte Monitoring Azure Databricks.

Para mais informações, consulte o pilar de excelência operacional no Quadro de Well-Architected Microsoft Azure.

Implementar este cenário

Para implementar e executar a implementação de referência, siga os passos na leitura gitHub.

Pode querer rever os seguintes cenários de exemplo Azure que demonstram soluções específicas utilizando algumas das mesmas tecnologias: