Editar

Processamento de fluxos com o Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Esta arquitetura de referência mostra um pipeline de processamento de fluxo de ponta a ponta. Esse tipo de pipeline tem quatro estágios: ingerir, processar, armazenar e analisar e gerar relatórios. Para essa arquitetura de referência, o pipeline ingere dados de duas fontes, executa uma junção em registros relacionados de cada fluxo, enriquece o resultado e calcula uma média em tempo real. Os resultados são armazenados para análise posterior.

GitHub logo Uma implementação de referência para essa arquitetura está disponível no GitHub.

Arquitetura

Diagram showing a reference architecture for stream processing with Azure Databricks.

Baixe um arquivo Visio desta arquitetura.

Fluxo de trabalho

A arquitetura é composta pelos seguintes componentes:

Origens de dados. Nessa arquitetura, existem duas fontes de dados que geram fluxos de dados em tempo real. O primeiro fluxo contém informações de viagem e o segundo contém informações sobre tarifas. A arquitetura de referência inclui um gerador de dados simulado que lê de um conjunto de arquivos estáticos e envia os dados para Hubs de Eventos. As fontes de dados em uma aplicação real seriam dispositivos instalados nos táxis.

Hubs de Eventos do Azure. Os Hubs de Eventos são um serviço de ingestão de eventos. Essa arquitetura usa duas instâncias de hub de eventos, uma para cada fonte de dados. Cada fonte de dados envia um fluxo de dados para o hub de eventos associado.

Azure Databricks. O Databricks é uma plataforma de análise baseada no Apache Spark otimizada para a plataforma de serviços de nuvem do Microsoft Azure. O Databricks é usado para correlacionar os dados de corrida de táxi e tarifa, e também para enriquecer os dados correlacionados com os dados de vizinhança armazenados no sistema de arquivos Databricks.

Azure Cosmos DB. A saída de um trabalho do Azure Databricks é uma série de registros, que são gravados no Azure Cosmos DB para Apache Cassandra. O Azure Cosmos DB para Apache Cassandra é usado porque dá suporte à modelagem de dados de séries temporais.

Azure Log Analytics. Os dados de log do aplicativo coletados pelo Azure Monitor são armazenados em um espaço de trabalho do Log Analytics. As consultas do Log Analytics podem ser usadas para analisar e visualizar métricas e inspecionar mensagens de log para identificar problemas no aplicativo.

Alternativas

  • O Synapse Link é a solução preferida da Microsoft para análises sobre os dados do Azure Cosmos DB.

Detalhes do cenário

Cenário: Uma empresa de táxis recolhe dados sobre cada viagem de táxi. Para esse cenário, assumimos que há dois dispositivos separados enviando dados. O táxi tem um medidor que envia informações sobre cada viagem - a duração, distância e locais de embarque e desembarque. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre tarifas. Para identificar as tendências de passageiros, a empresa de táxis quer calcular a gorjeta média por quilômetro rodado, em tempo real, para cada bairro.

Potenciais casos de utilização

Esta solução é otimizada para o setor de varejo.

Ingestão de dados

Para simular uma fonte de dados, essa arquitetura de referência usa o conjunto de dados do New York City Taxi Data[1]. Este conjunto de dados contém dados sobre viagens de táxi na cidade de Nova Iorque durante um período de quatro anos (2010 – 2013). Ele contém dois tipos de registro: dados de viagem e dados de tarifa. Os dados da viagem incluem a duração da viagem, a distância da viagem e o local de embarque e desembarque. Os dados da tarifa incluem valores de tarifas, impostos e gorjetas. Os campos comuns em ambos os tipos de registro incluem número de medalhão, licença de hack e ID do fornecedor. Juntos, estes três campos identificam exclusivamente um táxi mais um motorista. Os dados são armazenados em formato CSV.

[1] Donovan, Brian; Work, Dan (2016): Dados da viagem de táxi de Nova Iorque (2010-2013). Universidade de Illinois em Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

O gerador de dados é um aplicativo .NET Core que lê os registros e os envia para os Hubs de Eventos do Azure. O gerador envia dados de viagem em formato JSON e dados de tarifa em formato CSV.

Os Hubs de Eventos usam partições para segmentar os dados. As partições permitem que um consumidor leia cada partição em paralelo. Ao enviar dados para Hubs de Eventos, você pode especificar a chave de partição explicitamente. Caso contrário, os registros são atribuídos a partições de forma round-robin.

Nesse cenário, os dados de viagem e os dados de tarifa devem acabar com o mesmo ID de partição para um determinado táxi. Isso permite que o Databricks aplique um grau de paralelismo quando correlaciona os dois fluxos. Um registro na partição n dos dados da viagem corresponderá a um registro na partição n dos dados da tarifa.

Diagram of stream processing with Azure Databricks and Event Hubs.

Baixe um arquivo Visio desta arquitetura.

No gerador de dados, o modelo de dados comum para ambos os tipos de registro tem uma PartitionKey propriedade que é a concatenação de Medallion, HackLicensee VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Esta propriedade é usada para fornecer uma chave de partição explícita ao enviar para Hubs de Eventos:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

A capacidade de taxa de transferência dos Hubs de Eventos é medida em unidades de taxa de transferência. Você pode dimensionar automaticamente um hub de eventos habilitando a auto-inflação, que dimensiona automaticamente as unidades de taxa de transferência com base no tráfego, até um máximo configurado.

Processamento de fluxos

No Azure Databricks, o processamento de dados é executado por um trabalho. O trabalho é atribuído e executado em um cluster. O trabalho pode ser um código personalizado escrito em Java ou um bloco de anotações do Spark.

Nesta arquitetura de referência, o trabalho é um arquivo Java com classes escritas em Java e Scala. Ao especificar o arquivo Java para um trabalho Databricks, a classe é especificada para execução pelo cluster Databricks. Aqui, o método principal da classe com.microsoft.pnp.TaxiCabReader contém a lógica de processamento de dados.

Lendo o fluxo das duas instâncias do hub de eventos

A lógica de processamento de dados usa o streaming estruturado do Spark para ler as duas instâncias do hub de eventos do Azure:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

Enriquecendo os dados com as informações do bairro

Os dados da viagem incluem as coordenadas de latitude e longitude dos locais de embarque e desembarque. Embora essas coordenadas sejam úteis, elas não são facilmente consumidas para análise. Portanto, esses dados são enriquecidos com dados de vizinhança que são lidos de um shapefile.

O formato shapefile é binário e não é facilmente analisado, mas a biblioteca GeoTools fornece ferramentas para dados geoespaciais que usam o formato shapefile. Essa biblioteca é usada na classe com.microsoft.pnp.GeoFinder para determinar o nome do bairro com base nas coordenadas de embarque e desembarque.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Juntando-se aos dados de viagem e tarifa

Primeiro, os dados de viagem e tarifa são transformados:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

E, em seguida, os dados da viagem são unidos aos dados da tarifa:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Processando os dados e inserindo no Azure Cosmos DB

O valor médio da tarifa para cada bairro é calculado para um determinado intervalo de tempo:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Que é então inserido no Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Considerações

Essas considerações implementam os pilares do Azure Well-Architected Framework, que é um conjunto de princípios orientadores que podem ser usados para melhorar a qualidade de uma carga de trabalho. Para obter mais informações, consulte Microsoft Azure Well-Architected Framework.

Segurança

A segurança oferece garantias contra ataques deliberados e o abuso de seus valiosos dados e sistemas. Para obter mais informações, consulte Visão geral do pilar de segurança.

O acesso ao espaço de trabalho do Azure Databricks é controlado usando o console do administrador. O console do administrador inclui funcionalidade para adicionar usuários, gerenciar permissões de usuário e configurar o logon único. O controle de acesso para espaços de trabalho, clusters, trabalhos e tabelas também pode ser definido por meio do console do administrador.

Gestão de segredos

O Azure Databricks inclui um repositório secreto que é usado para armazenar segredos, incluindo cadeias de conexão, chaves de acesso, nomes de usuário e senhas. Os segredos dentro do repositório secreto do Azure Databricks são particionados por escopos:

databricks secrets create-scope --scope "azure-databricks-job"

Os segredos são adicionados no nível do escopo:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Nota

Um escopo apoiado pelo Azure Key Vault pode ser usado em vez do escopo nativo do Azure Databricks. Para saber mais, consulte Escopos apoiados pelo Azure Key Vault.

No código, os segredos são acessados por meio dos utilitários de segredos do Azure Databricks.

Monitorização

O Azure Databricks é baseado no Apache Spark e ambos usam log4j como a biblioteca padrão para log. Além do log padrão fornecido pelo Apache Spark, você pode implementar o log no Azure Log Analytics seguindo o artigo Monitorando o Azure Databricks.

Como a classe com.microsoft.pnp.TaxiCabReader processa mensagens de viagem e tarifa, é possível que qualquer uma delas esteja malformada e, portanto, não seja válida. Em um ambiente de produção, é importante analisar essas mensagens malformadas para identificar um problema com as fontes de dados para que ele possa ser corrigido rapidamente para evitar a perda de dados. A classe com.microsoft.pnp.TaxiCabReader registra um Apache Spark Accumulator que controla o número de registros de tarifas e viagens malformadas:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

O Apache Spark usa a biblioteca do Dropwizard para enviar métricas, e alguns dos campos de métricas nativos do Dropwizard são incompatíveis com o Azure Log Analytics. Portanto, essa arquitetura de referência inclui um coletor e um repórter personalizados do Dropwizard. Ele formata as métricas no formato esperado pelo Azure Log Analytics. Quando o Apache Spark relata métricas, as métricas personalizadas para os dados de viagem e tarifa malformados também são enviadas.

A seguir estão exemplos de consultas que você pode usar em seu espaço de trabalho do Azure Log Analytics para monitorar a execução do trabalho de streaming. O argumento ago(1d) em cada consulta retornará todos os registros que foram gerados no último dia e pode ser ajustado para exibir um período de tempo diferente.

Exceções registradas durante a execução da consulta de fluxo

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Acumulação de dados malformados sobre tarifas e viagens

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Execução de trabalhos ao longo do tempo

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Para obter mais informações, consulte Monitorando o Azure Databricks.

DevOps

  • Crie grupos de recursos separados para ambientes de produção, desenvolvimento e teste. A utilização de grupos de recursos separados torna mais fácil gerir as implementações, eliminar as implementações de teste e atribuir direitos de acesso.

  • Use o modelo do Azure Resource Manager para implantar os recursos do Azure seguindo a infraestrutura como Processo de Código (IaC). Com modelos, automatizar implantações usando os Serviços de DevOps do Azure ou outras soluções de CI/CD é mais fácil.

  • Coloque cada carga de trabalho em um modelo de implantação separado e armazene os recursos em sistemas de controle do código-fonte. Você pode implantar os modelos juntos ou individualmente como parte de um processo de CI/CD, facilitando o processo de automação.

    Nessa arquitetura, os Hubs de Eventos do Azure, o Log Analytics e o Azure Cosmos DB são identificados como uma única carga de trabalho. Esses recursos estão incluídos em um único modelo ARM.

  • Considere preparar suas cargas de trabalho. Implante em vários estágios e execute verificações de validação em cada estágio antes de passar para o próximo estágio. Dessa forma, você pode enviar atualizações para seus ambientes de produção de forma altamente controlada e minimizar problemas de implantação imprevistos.

    Nessa arquitetura, há vários estágios de implantação. Considere criar um Pipeline de DevOps do Azure e adicionar esses estágios. Aqui estão alguns exemplos de etapas que você pode automatizar:

    • Iniciar um cluster Databricks
    • Configurar a CLI do Databricks
    • Instalar o Scala Tools
    • Adicionar os segredos do Databricks

    Além disso, considere escrever testes de integração automatizados para melhorar a qualidade e a confiabilidade do código Databricks e seu ciclo de vida.

  • Considere usar o Azure Monitor para analisar o desempenho do seu pipeline de processamento de fluxo. Para obter mais informações, consulte Monitorando o Azure Databricks.

Para obter mais informações, consulte a seção DevOps no Microsoft Azure Well-Architected Framework.

Otimização de custos

A otimização de custos consiste em procurar formas de reduzir despesas desnecessárias e melhorar a eficiência operacional. Para obter mais informações, consulte Visão geral do pilar de otimização de custos.

Utilize a calculadora de preços do Azure para prever os custos. Aqui estão algumas considerações para serviços usados nessa arquitetura de referência.

Event Hubs

Essa arquitetura de referência implanta Hubs de Eventos na camada Padrão . O modelo de preços é baseado em unidades de taxa de transferência, eventos de entrada e eventos de captura. Um evento de entrada é uma unidade de dados com 64 KB ou menos. As mensagens maiores são cobradas em múltiplos de 64 KB. Você especifica unidades de taxa de transferência por meio do portal do Azure ou das APIs de gerenciamento de Hubs de Eventos.

Se precisar de mais dias de retenção, considere a camada Dedicada . Essa camada oferece implantações de locatário único com os requisitos mais exigentes. Essa oferta cria um cluster com base em unidades de capacidade () que não está vinculado por unidades de taxa de transferência.

A camada Standard também é cobrada com base em eventos de entrada e unidades de taxa de transferência.

Para obter informações sobre os preços dos Hubs de Eventos, consulte os preços dos Hubs de Eventos.

Azure Databricks

O Azure Databricks oferece duas camadas, Standard e Premium, cada uma com suporte a três cargas de trabalho. Esta arquitetura de referência implanta o espaço de trabalho do Azure Databricks na camada Premium .

Engenharia de Dados e Engenharia de Dados As cargas de trabalho leves são para engenheiros de dados criarem e executarem trabalhos. A carga de trabalho do Data Analytics destina-se a cientistas de dados para explorar, visualizar, manipular e compartilhar dados e insights de forma interativa.

O Azure Databricks oferece muitos modelos de preços.

  • Plano pay-as-you-go

    Você é cobrado por máquinas virtuais (VMs) provisionadas em clusters e unidades Databricks (DBUs) com base na instância de VM selecionada. Uma DBU é uma unidade de capacidade de processamento faturada numa utilização por segundo. O consumo de DBUs depende do tamanho e tipo de instância que executa o Azure Databricks. O preço dependerá da carga de trabalho e do nível selecionados.

  • Plano de pré-compra

    Você se compromete com o Azure Databricks Units (DBU) como Databricks Commit Units (DBCU) por um ou três anos. Quando comparado com o modelo pré-pago, pode poupar até 37%.

Para obter mais informações, consulte Preços do Azure Databricks.

BD do Cosmos para o Azure

Nessa arquitetura, uma série de registros é gravada no Azure Cosmos DB pelo trabalho do Azure Databricks. Você será cobrado pela capacidade que você reservar, expressa em Unidades de Solicitação por segundo (RU/s), usada para executar operações de inserção. A unidade para faturamento é de 100 RU/seg por hora. Por exemplo, o custo de escrever itens de 100 KB é de 50 RU/s.

Para operações de gravação, provisione capacidade suficiente para suportar o número de gravações necessárias por segundo. Você pode aumentar a taxa de transferência provisionada usando o portal ou a CLI do Azure antes de executar operações de gravação e, em seguida, reduzir a taxa de transferência após a conclusão dessas operações. Sua taxa de transferência para o período de gravação é a taxa de transferência mínima necessária para os dados fornecidos mais a taxa de transferência necessária para a operação de inserção, supondo que nenhuma outra carga de trabalho esteja em execução.

Exemplo de análise de custos

Suponha que você configure um valor de taxa de transferência de 1.000 RU/s em um contêiner. É implantado por 24 horas por 30 dias, um total de 720 horas.

O contêiner é cobrado em 10 unidades de 100 RU/seg por hora para cada hora. 10 unidades a US$ 0,008 (por 100 RU/seg por hora) são cobradas US$ 0,08 por hora.

Por 720 horas ou 7.200 unidades (de 100 RUs), você é cobrado US $ 57,60 pelo mês.

O armazenamento também é cobrado por cada GB usado para os dados armazenados e o índice. Para obter mais informações, consulte Modelo de preços do Azure Cosmos DB.

Use a calculadora de capacidade do Azure Cosmos DB para obter uma estimativa rápida do custo da carga de trabalho.

Para obter mais informações, veja a secção de custos Well-Architected Framework do Microsoft Azure.

Implementar este cenário

Para implantar e executar a implementação de referência, siga as etapas no readme do GitHub.

Próximos passos