Compartilhar via


Processamento de fluxo com o Azure Databricks

Azure Cosmos DB
Azure Databricks
Hubs de eventos do Azure
Azure Log Analytics
Azure Monitor

Essa arquitetura de referência mostra um pipeline de processamento de fluxo de ponta a ponta. Os quatro estágios desse pipeline incluem ingestão, processamento, armazenamento e análise e relatórios. Para essa arquitetura de referência, o pipeline ingere dados de duas origens, realiza uma junção em registros relacionados de cada fluxo, enriquece o resultado e calcula uma média em tempo real. Em seguida, os resultados são armazenados para análise posterior.

Arquitetura

Diagrama que mostra uma arquitetura de referência para processamento de fluxo com o Azure Databricks.

Baixe um Arquivo Visio dessa arquitetura.

Fluxo de dados

O fluxo de dados a seguir corresponde ao diagrama anterior:

  1. Ingest

    Dois fluxos de dados operacionais em tempo real alimentam o sistema: dados de tarifa e dados de viagem . Os dispositivos instalados em táxis servem como fontes de dados e publicam eventos nos Hubs de Eventos do Azure. Cada fluxo vai para sua própria instância do hub de eventos, que fornece caminhos de ingestão independentes.

  2. Processo

    O Azure Databricks consome os fluxos dos Hubs de Eventos e executa as seguintes operações:

    • Correlaciona registros tarifários com registros de viagem
    • Enriquece os dados usando um terceiro conjunto de dados que contém dados de pesquisa de bairro armazenados no Sistema de Arquivos do Azure Databricks

    Esse processo produz um conjunto de dados unificado e enriquecido adequado para análise downstream e armazenamento.

  3. Repositório

    A saída dos trabalhos do Azure Databricks é uma série de registros. Os registros processados são gravados no Azure Cosmos DB para NoSQL.

  4. Analisar/relatar

    O Fabric espelha dados operacionais do Azure Cosmos DB para NoSQL para habilitar consultas analíticas sem afetar o desempenho transacional. Essa abordagem fornece um caminho sem ETL para análise. Nesta arquitetura, você pode usar o espelhamento para as seguintes finalidades:

    • Espelhar dados do Azure Cosmos DB (ou dados formatados por Delta) no Fabric
    • Manter conjuntos de dados sincronizados com o sistema operacional
    • Habilite a análise por meio das seguintes ferramentas:
      • Pontos de extremidade de análise de SQL do Fabric para lakehouses e armazéns
      • Notebooks do Apache Spark
      • Análise em tempo real usando KQL (Linguagem de Consulta Kusto) para exploração em séries temporais e logs
  5. Monitorar

    O Azure Monitor coleta dados de telemetria do pipeline de processamento do Azure Databricks. Um workspace do Log Analytics armazena métricas e logs de aplicativos. Você pode executar as seguintes ações:

    • Consultar logs operacionais
    • Visualizar métricas
    • Inspecionar falhas, anomalias e problemas de desempenho
    • Criar painéis

Components

  • O Azure Databricks é uma plataforma de análise baseada em Spark otimizada para a plataforma do Azure. Nessa arquitetura, os trabalhos do Azure Databricks enriquecem os dados de corrida e tarifa de táxi e armazenam os resultados no Azure Cosmos DB.

  • Os Hubs de Eventos são um serviço de ingestão distribuída gerenciado que pode ser dimensionado para ingerir grandes quantidades de eventos. Essa arquitetura usa duas instâncias do hub de eventos para receber dados de táxis.

  • O Azure Cosmos DB para NoSQL é um serviço de banco de dados gerenciado de vários modelos. Nessa arquitetura, armazena-se o resultado dos trabalhos de enriquecimento do Azure Databricks. O Fabric espelha dados operacionais do Azure Cosmos DB para habilitar consultas analíticas.

  • O Log Analytics é uma ferramenta no Azure Monitor que ajuda você a consultar e analisar dados de log de várias fontes. Nessa arquitetura, todos os recursos configuram o Diagnóstico do Azure para armazenar logs de plataforma neste espaço de trabalho. O workspace também serve como o coletor de dados para as métricas de trabalho do Spark emitidas dos pipelines de processamento do Azure Databricks.

Detalhes do cenário

Uma empresa de táxi coleta dados sobre cada viagem de táxi. Para esse cenário, presumimos que dois dispositivos separados enviem dados. O táxi tem um medidor que envia informações sobre cada passeio, incluindo os locais de duração, distância e retirada e entrega. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre tarifas. Para detectar tendências de pilotagem, a empresa de táxi quer calcular a gorjeta média por milha conduzida para cada bairro, em tempo real.

Ingestão de dados

Para simular uma fonte de dados, essa arquitetura de referência usa o conjunto de dados de táxi da cidade de Nova York. Esse conjunto de dados contém dados sobre viagens de táxi em Nova York de 2010 a 2013. Ele contém registros de dados de corrida e tarifa. Os dados de passeio incluem a duração da viagem, a distância da viagem e os locais de retirada e entrega. Os dados de tarifa incluem a tarifa, impostos e quantias das gorjetas. Os campos em ambos os tipos de registro incluem número de medalhão, licença de hack e ID do fornecedor. A combinação desses três campos identifica exclusivamente um táxi e um motorista. Os dados são armazenados no formato CSV.

O gerador de dados é um aplicativo .NET Core que lê os registros e os envia aos Hubs de Eventos. O gerador envia os dados de corrida em formato JSON e os dados de tarifa em formato CSV.

Os Hubs de Eventos usam partições para segmentar os dados. As partições permitem que um consumidor leia cada dado de leitura em paralelo. Ao enviar dados para Os Hubs de Eventos, você pode especificar a chave de partição diretamente. Caso contrário, os registros são atribuídos a partições no estilo round robin.

Nesse cenário, dados de carona e dados de tarifa devem receber a mesma ID de partição para um táxi específico. Essa atribuição permite que o Databricks aplique um grau de paralelismo quando correlacionar os dois fluxos. Por exemplo, um registro em partição n dos dados de passeio corresponde a um registro na partição n dos dados de tarifa.

Diagrama de processamento de fluxo com o Azure Databricks e os Hubs de Eventos.

Baixe um Arquivo Visio dessa arquitetura.

No gerador de dados, o modelo de dados comum para ambos os tipos de registro têm uma propriedade PartitionKey que é a concatenação de Medallion, HackLicense e VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Essa propriedade fornece uma chave de partição explícita quando envia dados aos Hubs de Eventos.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Hubs de Eventos

A capacidade da taxa de transferência dos Hubs de Eventos é medida em unidades de produtividade. Você pode dimensionar automaticamente um hub de eventos habilitando a inflação automática. Esse recurso dimensiona automaticamente as unidades de taxa de transferência com base no tráfego, até um máximo configurado.

Processamento de fluxo

No Azure Databricks, um trabalho executa o processamento de dados. O trabalho é atribuído a um cluster e, em seguida, é executado nele. O trabalho pode ser um código personalizado escrito em Java ou em um bloco de anotações do Spark .

Nesta arquitetura de referência, o trabalho é um arquivo java que tem classes escritas em Java e Scala. Quando você especifica o arquivo Java para um trabalho do Azure Databricks, o cluster do Azure Databricks especifica a classe para operação. Aqui, o main método da classe com.microsoft.pnp.TaxiCabReader contém a lógica de processamento de dados.

Ler o fluxo das duas instâncias do hub de eventos

A lógica de processamento de dados usa o fluxo estruturado do Spark para ler as duas instâncias do hub de eventos do Azure:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Enriquecer os dados com as informações do bairro

Os dados de passeio incluem as coordenadas de latitude e longitude dos locais de retirada e entrega. Essas coordenadas são úteis, mas não são facilmente consumidas para análise. Portanto, o pipeline enriquece esses dados com dados de bairro lidos de um shapefile.

O formato shapefile é binário e não é facilmente analisado. Mas a biblioteca GeoTools fornece ferramentas para dados geoespaciais que usam o formato shapefile. Essa biblioteca é usada na classe com.microsoft.pnp.GeoFinder para determinar o nome do bairro com base nas coordenadas para locais de retirada e entrega.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Ingressar nos dados de corrida e tarifa

Primeiro, os dados da corrida e da tarifa são transformados:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Em seguida, os dados de carona são associados aos dados de tarifa:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Processar os dados e inseri-los no Azure Cosmos DB

O valor médio da tarifa para cada bairro é calculado para um intervalo de tempo específico:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

O valor médio da tarifa é inserido no Azure Cosmos DB:

maxAvgFarePerNeighborhood
  .writeStream
  .format("cosmos.oltp")
  .option("spark.cosmos.accountEndpoint", "<your-cosmos-endpoint>")
  .option("spark.cosmos.accountKey", "<your-cosmos-key>")
  .option("spark.cosmos.database", "<your-database-name>")
  .option("spark.cosmos.container", "<your-container-name>")
  .option("checkpointLocation", "/mnt/checkpoints/maxAvgFarePerNeighborhood")
  .outputMode("append")
  .start()
  .awaitTermination()

Considerações

Essas considerações implementam os pilares do Azure Well-Architected Framework​, um conjunto de princípios orientadores que você pode usar para aprimorar a qualidade de uma carga de trabalho. Para obter mais informações, consulte Well-Architected Framework.

Segurança

A segurança fornece garantias contra ataques deliberados e o uso indevido de seus valiosos dados e sistemas. Para obter mais informações, consulte Lista de verificação de revisão de design parade segurança.

O acesso ao workspace do Azure Databricks é controlado usando o console de administrador . O console do administrador inclui funcionalidade para adicionar usuários, gerenciar permissões de usuário e configurar o logon único. O controle de acesso para workspaces, clusters, trabalhos e tabelas também pode ser definido pelo console do administrador.

Gerenciar segredos

O Azure Databricks inclui um repositório de segredos usado para armazenar credenciais e referenciá-las em notebooks e trabalhos. Escopos de segredos de partição no repositório de segredos do Azure Databricks:

databricks secrets create-scope --scope "azure-databricks-job"

Os segredos são adicionados ao nível do escopo:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Observação

Use uma de escopo com suporte do Azure Key Vault em vez do escopo nativo do Azure Databricks.

O código acessa secretos utilizando os utilitários de secretos do Azure Databricks.

Otimização de custos

A Otimização de Custos concentra-se em maneiras de reduzir despesas desnecessárias e melhorar a eficiência operacional. Para obter mais informações, consulte Lista de verificação de revisão de design parade Otimização de Custos.

Use a Calculadora de Preços do Azure para estimar os custos. Considere os serviços a seguir usados nesta arquitetura de referência.

Considerações de custo dos Hubs de Eventos

Essa arquitetura de referência implanta Hubs de Eventos na camada Standard. O modelo de preços é baseado em unidades de taxa de transferência, eventos de entrada e eventos de captura. Um evento de entrada é uma unidade de dados com 64 KB ou menos. As mensagens maiores são cobradas em múltiplos de 64 KB. Especifique as unidades de produtividade por meio do portal do Azure ou das APIs de gerenciamento dos Hubs de Eventos.

Se você precisar de mais dias de retenção, considere a camada Dedicada. Essa camada fornece implantações de locatário único que têm requisitos rigorosos. Esta oferta cria um cluster baseado em unidades de capacidade e não depende de unidades de taxa de transferência. A camada Standard também é cobrada com base em eventos de entrada e unidades de taxa de transferência.

Para obter mais informações, consulte de preços dos Hubs de Eventos.

Considerações de custo do Azure Databricks

O Azure Databricks fornece a camada Standard e a camada Premium, ambas compatíveis com três cargas de trabalho. Essa arquitetura de referência implanta um workspace do Azure Databricks na camada Premium.

As cargas de trabalho de engenharia de dados devem ser executadas em um cluster de trabalho. Os engenheiros de dados usam clusters para criar e executar trabalhos. As cargas de trabalho de análise de dados devem ser executadas em um cluster para todos os fins e destinam-se aos cientistas de dados a explorar, visualizar, manipular e compartilhar dados e insights interativamente.

O Azure Databricks fornece vários modelos de preços.

  • plano pago conforme o uso

    Você é cobrado por VMs (máquinas virtuais) provisionadas em clusters e DBUs (unidades do Azure Databricks) com base na instância de VM escolhida. Uma DBU é uma unidade de capacidade de processamento que o Azure cobra por uso por segundo. O consumo de DBU depende do tamanho e do tipo de instância executado no Azure Databricks. O preço depende da carga de trabalho e da camada escolhidas.

  • plano de pré-compra

    Você se compromete com DBUs como unidades de confirmação do Azure Databricks por um ou três anos para reduzir o custo total de propriedade durante esse período de tempo em comparação com o modelo pago conforme o uso.

Para obter mais informações, consulte de preços do Azure Databricks.

Considerações de custo do Azure Cosmos DB

Nessa arquitetura, o trabalho do Azure Databricks grava uma série de registros no Azure Cosmos DB. Você é cobrado pela capacidade reservada, que é medida em RU/s (Unidades de Solicitação por segundo). Essa capacidade é usada para executar operações de inserção. A unidade para cobrança é de 100 RU/s por hora. Por exemplo, o custo do desenvolvimento de itens de 100 KB é de 50 RU/s.

Para operações de gravação, configure capacidade suficiente para dar suporte ao número de gravações necessárias por segundo. Você pode aumentar a taxa de transferência provisionada usando o portal ou a CLI do Azure antes de executar operações de gravação e, em seguida, reduzir a taxa de transferência após a conclusão dessas operações. Sua taxa de transferência para o período de gravação é a soma da taxa de transferência mínima necessária para os dados específicos e a taxa de transferência necessária para a operação de inserção. Esse cálculo pressupõe que não há outra carga de trabalho em execução.

Exemplo de análise de custo

Suponha que você configure um valor de taxa de transferência de 1.000 RU/s em um contêiner e execute-o continuamente por 30 dias, o que equivale a 720 horas.

O contêiner é cobrado em 10 unidades de 100 RU/s por hora para cada hora. Dez unidades a US$ 0,008 (por 100 RU/s por hora) são cobradas a US$ 0,08 por hora.

Por 720 horas ou 7.200 unidades (de 100 RUs), você recebe US$ 57,60 pelo mês.

O armazenamento também é cobrado para cada GB usado para seus dados armazenados e índice. Para mais informações, consulte Modelo de preços do Azure Cosmos DB.

Use a calculadora de capacidade do Azure Cosmos DB para obter uma estimativa rápida do custo da carga de trabalho.

Excelência Operacional

A Excelência Operacional abrange os processos de operações que implantam um aplicativo e o mantêm em execução em produção. Para obter mais informações, consulte Lista de verificação de revisão de design parade Excelência Operacional.

Monitoramento

O Azure Databricks é baseado no Apache Spark. O Azure Databricks e o Apache Spark usam apache Log4j como a biblioteca padrão para registro em log. Além do log padrão que o Apache Spark fornece, você pode implementar o log no Log Analytics. Para obter mais informações, consulte Monitorar o Azure Databricks.

Como a classe com.microsoft.pnp.TaxiCabReader processa mensagens de corrida e tarifa, uma mensagem pode ser malformada e, portanto, não válida. Em um ambiente de produção, é importante analisar essas mensagens malformadas para identificar um problema com as fontes de dados para que ele possa ser corrigido rapidamente para evitar a perda de dados. A classe com.microsoft.pnp.TaxiCabReader registra um Acumulador do Apache Spark que rastreia o número de registros de tarifas malformados e registros de passeio:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

O Apache Spark usa a biblioteca Dropwizard para enviar métricas. Alguns dos campos de métricas dropwizard nativos são incompatíveis com o Log Analytics, razão pela qual essa arquitetura de referência inclui um coletor dropwizard personalizado e um repórter. Ele formata as métricas no formato esperado pelo Log Analytics. Quando o Apache Spark relata as métricas, as métricas personalizadas dos dados malformados de tarifa e corrida também são enviadas.

Você pode usar as consultas de exemplo a seguir em seu workspace do Log Analytics para monitorar a operação do trabalho de streaming. O argumento ago(1d) em cada consulta retorna todos os registros que foram gerados no último dia. Você pode ajustar esse parâmetro para exibir um período de tempo diferente.

Exceções registradas durante a operação de consulta de fluxo

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Acumulação de tarifas malformadas e dados da corrida

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Operação de trabalho ao longo do tempo

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Organização e implantações de recursos

  • Crie grupos de recursos separados para ambientes de produção, desenvolvimento e teste. Ter grupos de recursos separados facilita o gerenciamento de implantações, a exclusão de implantações de teste a atribuição de direitos de acesso.

  • Use o modelo do Azure Resource Manager para implantar os recursos do Azure de acordo com o processo de infraestrutura como código. Usando modelos, você pode automatizar implantações com serviços do Azure DevOps ou outras soluções de CI/CD (integração contínua e entrega contínua).

  • Coloque cada carga de trabalho em um modelo de implantação separado e armazene os recursos em sistemas de controle do código-fonte. Você pode implantar os modelos juntos ou individualmente como parte de um processo de CI/CD. Essa abordagem simplifica o processo de automação.

    Nessa arquitetura, os Hubs de Eventos, o Log Analytics e o Azure Cosmos DB são identificados como uma única carga de trabalho. Esses recursos são incluídos em um único modelo do Azure Resource Manager.

  • Considere preparar suas cargas de trabalho. Implante em vários estágios e execute verificações de validação em cada estágio antes de passar para o próximo estágio. Dessa forma, você pode controlar como enviar atualizações por push para seus ambientes de produção e minimizar problemas de implantação imprevistos.

    Nessa arquitetura, há vários estágios de implantação. Considere criar um pipeline do Azure DevOps e adicionar esses estágios. Você pode automatizar os seguintes estágios:

    • Inicie um cluster do Azure Databricks.
    • Configure o CLI do Azure Databricks.
    • Instale as ferramentas do Scala.
    • Adicione os segredos do Azure Databricks.

    Considere escrever testes de integração automatizados para melhorar a qualidade e a confiabilidade do código do Azure Databricks e seu ciclo de vida.

Próxima etapa