Editar

Compartilhar via


Processamento de fluxo com o Azure Databricks

Azure Cosmos DB
Azure Databricks
Hubs de eventos do Azure
Azure Log Analytics
Azure Monitor

Essa arquitetura de referência mostra um pipeline de processamento de fluxo de ponta a ponta. Os quatro estágios desse pipeline são ingestão, processo, armazenamento e análise e relatório. Para essa arquitetura de referência, o pipeline ingere dados de duas origens, realiza uma junção em registros relacionados de cada fluxo, enriquece o resultado e calcula uma média em tempo real. Em seguida, os resultados são armazenados para análise posterior.

Logotipo do GitHub Há uma implantação de referência para essa arquitetura disponível no GitHub.

Arquitetura

Diagrama que mostra uma arquitetura de referência para processamento de fluxo com o Azure Databricks.

Baixe um Arquivo Visio dessa arquitetura.

Workflow

O fluxo de dados a seguir corresponde ao diagrama anterior:

  1. Nessa arquitetura há duas fontes de dados que geram fluxos de dados em tempo real. O primeiro fluxo contém informações de passeio e o segundo fluxo contém informações sobre tarifas. A arquitetura de referência inclui um gerador de dados simulado que lê de um conjunto de arquivos estáticos e envia os dados por push para os Hubs de Eventos do Azure. As fontes de dados em um aplicativo real são dispositivos instalados nos táxis.

  2. hubs de eventos é um serviço de ingestão de eventos. Essa arquitetura usa duas instâncias de hub de eventos, uma para cada fonte de dados. Cada fonte de dados envia um fluxo de dados para o hub de eventos associado.

  3. a do Azure Databricks é uma plataforma de análise baseada no Apache Spark otimizada para a plataforma de serviços de nuvem do Microsoft Azure. O Azure Databricks é usado para correlacionar os dados de corrida e tarifa de táxi e enriquecer os dados correlacionados com dados de bairro armazenados no sistema de arquivos do Azure Databricks.

  4. do Azure Cosmos DB é um serviço de banco de dados de vários modelos totalmente gerenciado. A saída do trabalho de um trabalho do Azure Databricks é uma série de registros que são gravados no Azure Cosmos DB for Apache Cassandra. O Azure Cosmos DB para Apache Cassandra é usado porque oferece suporte à modelagem de dados de série temporal.

  5. Log Analytics é uma ferramenta no Azure Monitor que permite consultar e analisar dados de log de várias fontes. Os dados de log de aplicativo coletados a do Azure Monitor são armazenados em um de workspace do Log Analytics. Você pode usar consultas do Log Analytics para analisar e visualizar métricas e inspecionar mensagens de log para identificar problemas dentro do aplicativo.

Detalhes do cenário

Uma empresa de táxi coleta dados sobre cada viagem de táxi. Para esse cenário, presumimos que dois dispositivos separados enviem dados. O táxi tem um medidor que envia informações sobre cada passeio, incluindo os locais de duração, distância e retirada e entrega. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre tarifas. Para detectar tendências de pilotagem, a empresa de táxi quer calcular a gorjeta média por milha conduzida para cada bairro, em tempo real.

Ingestão de dados

Para simular uma fonte de dados, essa arquitetura de referência usa o conjunto de dados de dados de táxi da cidade de Nova York 1. Esse conjunto de dados contém dados sobre viagens de táxi em Nova York de 2010 a 2013. Ele contém registros de dados de corrida e tarifa. Os dados de passeio incluem a duração da viagem, a distância da viagem e os locais de retirada e entrega. Os dados de tarifa incluem a tarifa, impostos e quantias das gorjetas. Os campos em ambos os tipos de registro incluem número de medalhão, licença de hack e ID do fornecedor. A combinação desses três campos identifica exclusivamente um táxi e um motorista. Os dados são armazenados no formato CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidade de Illinois em Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

O gerador de dados é um aplicativo .NET Core que lê os registros e os envia aos Hubs de Eventos. O gerador envia os dados de corrida em formato JSON e os dados de tarifa em formato CSV.

Os Hubs de Eventos usam partições para segmentar os dados. As partições permitem que um consumidor leia cada partição em paralelo. Ao enviar dados para Os Hubs de Eventos, você pode especificar a chave de partição diretamente. Caso contrário, os registros são atribuídos a partições no estilo round robin.

Nesse cenário, dados de carona e dados de tarifa devem receber a mesma ID de partição para um táxi específico. Essa atribuição permite que o Databricks aplique um grau de paralelismo quando correlacionar os dois fluxos. Por exemplo, um registro em partição n dos dados de passeio corresponde a um registro na partição n dos dados de tarifa.

Diagrama de processamento de fluxo com o Azure Databricks e os Hubs de Eventos.

Baixe um Arquivo Visio dessa arquitetura.

No gerador de dados, o modelo de dados comum para ambos os tipos de registro têm uma propriedade PartitionKey que é a concatenação de Medallion, HackLicense e VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Essa propriedade é usada para fornecer uma chave de partição explícita quando envia dados aos Hubs de Eventos.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Hubs de Eventos

A capacidade da taxa de transferência dos Hubs de Eventos é medida em unidades de produtividade. Você pode dimensionar automaticamente um hub de eventos habilitando inflar automaticamente. Esse recurso dimensiona automaticamente as unidades de taxa de transferência com base no tráfego, até um máximo configurado.

Processamento de fluxo

No Azure Databricks, um trabalho executa o processamento de dados. O trabalho é atribuído a um cluster e, em seguida, é executado nele. O trabalho pode ser um código personalizado escrito em Java ou em um bloco de anotações do Spark .

Nesta arquitetura de referência, o trabalho é um arquivo java que tem classes escritas em Java e Scala. Quando você especifica o arquivo Java para um trabalho do Databricks, o cluster do Databricks especifica a classe para operação. Aqui, o main método da classe com.microsoft.pnp.TaxiCabReader contém a lógica de processamento de dados.

Ler o fluxo das duas instâncias do hub de eventos

A lógica de processamento de dados usa o fluxo estruturado do Spark para ler as duas instâncias do hub de eventos do Azure:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Enriquecer os dados com as informações do bairro

Os dados de passeio incluem as coordenadas de latitude e longitude dos locais de retirada e entrega. Essas coordenadas são úteis, mas não são facilmente consumidas para análise. Portanto, esses dados são enriquecidos com dados de bairro lidos de um deshapefile.

O formato shapefile é binário e não é facilmente analisado. Mas a biblioteca GeoTools fornece ferramentas para dados geoespaciais que usam o formato shapefile. Essa biblioteca é usada na classe com.microsoft.pnp.GeoFinder para determinar o nome do bairro com base nas coordenadas para locais de retirada e entrega.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Ingressar nos dados de corrida e tarifa

Primeiro, os dados da corrida e da tarifa são transformados:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Em seguida, os dados de carona são associados aos dados de tarifa:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Processar os dados e inseri-los no Azure Cosmos DB

O valor médio da tarifa para cada bairro é calculado para um intervalo de tempo específico:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

O valor médio da tarifa é inserido no Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Considerações

Essas considerações implementam os pilares do Azure Well-Architected Framework​, um conjunto de princípios orientadores que você pode usar para aprimorar a qualidade de uma carga de trabalho. Para obter mais informações, confira Microsoft Azure Well-Architected Framework.

Segurança

A segurança fornece garantias contra ataques deliberados e o uso indevido de seus valiosos dados e sistemas. Para obter mais informações, consulte Lista de verificação de revisão de design parade segurança.

O acesso ao workspace do Azure Databricks é controlado usando o console de administrador . O console do administrador inclui funcionalidade para adicionar usuários, gerenciar permissões de usuário e configurar o logon único. O controle de acesso para workspaces, clusters, trabalhos e tabelas também pode ser definido pelo console do administrador.

Gerenciar segredos

O Azure Databricks inclui um repositório de segredos usado para armazenar credenciais e referenciá-las em notebooks e trabalhos. Escopos de segredos de partição no repositório de segredos do Azure Databricks:

databricks secrets create-scope --scope "azure-databricks-job"

Os segredos são adicionados ao nível do escopo:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Observação

Use uma de escopo com suporte do Azure Key Vault em vez do escopo nativo do Azure Databricks.

No código, os segredos são acessados pelos utilitários segredos do Azure Databricks.

Otimização de custos

A Otimização de Custos concentra-se em maneiras de reduzir despesas desnecessárias e melhorar a eficiência operacional. Para obter mais informações, consulte Lista de verificação de revisão de design parade Otimização de Custos.

Use a Calculadora de Preços do Azure para estimar os custos. Considere os serviços a seguir usados nesta arquitetura de referência.

Considerações de custo dos Hubs de Eventos

Essa arquitetura de referência implanta Hubs de Eventos na camada Standard. O modelo de preços é baseado em unidades de taxa de transferência, eventos de entrada e eventos de captura. Um evento de entrada é uma unidade de dados com 64 KB ou menos. As mensagens maiores são cobradas em múltiplos de 64 KB. Especifique as unidades de produtividade por meio do portal do Azure ou das APIs de gerenciamento dos Hubs de Eventos.

Se você precisar de mais dias de retenção, considere a camada Dedicada. Essa camada fornece implantações de locatário único que têm requisitos rigorosos. Essa oferta cria um cluster baseado em unidades de capacidade e não depende de unidades de taxa de transferência. A camada Standard também é cobrada com base em eventos de entrada e unidades de taxa de transferência.

Para obter mais informações, consulte de preços dos Hubs de Eventos.

Considerações de custo do Azure Databricks

O Azure Databricks fornece a camada Standard e a camada Premium, ambas compatíveis com três cargas de trabalho. Essa arquitetura de referência implanta um workspace do Azure Databricks na camada Premium.

As cargas de trabalho de engenharia de dados devem ser executadas em um cluster de trabalho. Os engenheiros de dados usam clusters para criar e executar trabalhos. As cargas de trabalho de análise de dados devem ser executadas em um cluster para todos os fins e destinam-se aos cientistas de dados a explorar, visualizar, manipular e compartilhar dados e insights interativamente.

O Azure Databricks fornece vários modelos de preços.

  • plano pago conforme o uso

    Você é cobrado por VMs (máquinas virtuais) provisionadas em clusters e DBUs (unidades do Azure Databricks) com base na instância de VM escolhida. Uma DBU é uma unidade de capacidade de processamento cobrada pelo uso por segundo. O consumo de DBU depende do tamanho e do tipo de instância executado no Azure Databricks. O preço depende da carga de trabalho e da camada escolhidas.

  • plano de pré-compra

    Você se compromete com DBUs como unidades de confirmação do Azure Databricks por um ou três anos para reduzir o custo total de propriedade durante esse período de tempo em comparação com o modelo pago conforme o uso.

Para obter mais informações, consulte de preços do Azure Databricks.

Considerações de custo do Azure Cosmos DB

Nessa arquitetura, o trabalho do Azure Databricks grava uma série de registros no Azure Cosmos DB. Você é cobrado pela capacidade reservada, que é medida em RU/s (Unidades de Solicitação por segundo). Essa capacidade é usada para executar operações de inserção. A unidade para cobrança é de 100 RU/s por hora. Por exemplo, o custo do desenvolvimento de itens de 100 KB é de 50 RU/s.

Para operações de gravação, forneça capacidade suficiente para dar suporte ao número de gravações necessárias por segundo. Você pode aumentar a taxa de transferência provisionada usando o portal ou a CLI do Azure antes de executar operações de gravação e, em seguida, reduzir a taxa de transferência após a conclusão dessas operações. Sua taxa de transferência para o período de gravação é a soma da taxa de transferência mínima necessária para os dados específicos e a taxa de transferência necessária para a operação de inserção. Esse cálculo pressupõe que não há outra carga de trabalho em execução.

Exemplo de análise de custo

Suponha que você configure um valor de taxa de transferência de 1.000 RU/s em um contêiner. Ele é implantado por 24 horas por 30 dias, por um total de 720 horas.

O contêiner é cobrado em 10 unidades de 100 RU/s por hora para cada hora. Dez unidades a US$ 0,008 (por 100 RU/s por hora) são cobradas a US$ 0,08 por hora.

Por 720 horas ou 7.200 unidades (de 100 RUs), você recebe US$ 57,60 pelo mês.

O armazenamento também é cobrado para cada GB usado para seus dados armazenados e índice. Para mais informações, consulte Modelo de preços do Azure Cosmos DB.

Use a calculadora de capacidade do Azure Cosmos DB para obter uma estimativa rápida do custo da carga de trabalho.

Excelência Operacional

A Excelência Operacional abrange os processos de operações que implantam um aplicativo e o mantêm em execução em produção. Para obter mais informações, consulte Lista de verificação de revisão de design parade Excelência Operacional.

Monitoramento

O Azure Databricks é baseado no Apache Spark. O Azure Databricks e o Apache Spark usam apache Log4j como a biblioteca padrão para registro em log. Além do log padrão que o Apache Spark fornece, você pode implementar o log no Log Analytics. Para obter mais informações, consulte Monitorar o Azure Databricks.

Como a classe com.microsoft.pnp.TaxiCabReader processa mensagens de corrida e tarifa, uma mensagem pode ser malformada e, portanto, não válida. Em um ambiente de produção, é importante analisar essas mensagens malformadas para identificar um problema com as fontes de dados para que ele possa ser corrigido rapidamente para evitar a perda de dados. A classe com.microsoft.pnp.TaxiCabReader registra um Acumulador do Apache Spark que rastreia o número de registros de tarifas malformados e registros de passeio:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

O Apache Spark usa a biblioteca Dropwizard para enviar métricas. Alguns dos campos de métricas dropwizard nativos são incompatíveis com o Log Analytics, razão pela qual essa arquitetura de referência inclui um coletor dropwizard personalizado e um repórter. Ele formata as métricas no formato esperado pelo Log Analytics. Quando o Apache Spark relata as métricas, as métricas personalizadas dos dados malformados de tarifa e corrida também são enviadas.

Você pode usar as consultas de exemplo a seguir em seu workspace do Log Analytics para monitorar a operação do trabalho de streaming. O argumento ago(1d) em cada consulta retorna todos os registros que foram gerados no último dia. Você pode ajustar esse parâmetro para exibir um período de tempo diferente.

Exceções registradas durante a operação de consulta de fluxo

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Acumulação de tarifas malformadas e dados da corrida

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Operação de trabalho ao longo do tempo

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Organização e implantações de recursos

  • Crie grupos de recursos separados para ambientes de produção, desenvolvimento e teste. Ter grupos de recursos separados facilita o gerenciamento de implantações, a exclusão de implantações de teste a atribuição de direitos de acesso.

  • Use o modelo do Azure Resource Manager para implantar os recursos do Azure de acordo com o processo de infraestrutura como código. Usando modelos, você pode automatizar facilmente implantações com serviços do Azure DevOps ou outras soluções de CI/CD (integração contínua e entrega contínua).

  • Coloque cada carga de trabalho em um modelo de implantação separado e armazene os recursos em sistemas de controle do código-fonte. Você pode implantar os modelos juntos ou individualmente como parte de um processo de CI/CD. Essa abordagem simplifica o processo de automação.

    Nessa arquitetura, os Hubs de Eventos, o Log Analytics e o Azure Cosmos DB são identificados como uma única carga de trabalho. Esses recursos são incluídos em um único modelo do Azure Resource Manager.

  • Considere preparar suas cargas de trabalho. Implante em vários estágios e execute verificações de validação em cada estágio antes de passar para o próximo estágio. Dessa forma, você pode controlar como enviar atualizações por push para seus ambientes de produção e minimizar problemas de implantação imprevistos.

    Nessa arquitetura, há vários estágios de implantação. Considere criar um pipeline do Azure DevOps e adicionar esses estágios. Você pode automatizar os seguintes estágios:

    • Inicie um cluster do Databricks.
    • Configurar a CLI do Databricks.
    • Instale as ferramentas do Scala.
    • Adicione os segredos do Databricks.

    Considere escrever testes de integração automatizados para melhorar a qualidade e a confiabilidade do código do Databricks e seu ciclo de vida.

Implantar este cenário

Para implantar e executar a implementação de referência, siga as etapas no de leitura do GitHub.

Próxima etapa