Editar

Share via


Criar um delta lake para dar suporte a consultas ad hoc em reservas online e de viagem

Hubs de eventos do Azure
Armazenamento do Azure Data Lake
Azure Databricks
Azure Synapse Analytics

Essa arquitetura fornece um exemplo de lago delta para reservas de viagens, onde grandes quantidades de documentos brutos são geradas com alta frequência.

Apache® e Apache Spark™ são marcas registradas ou marcas comerciais do Apache Software Foundation nos Estados Unidos e/ou em outros países. O uso desta marca não implica aprovação por parte da Apache Software Foundation.

Arquitetura

Diagrama da arquitetura do Delta Lake.

Baixe um Arquivo Visio dessa arquitetura.

Cenários de reserva de lazer e viagens podem gerar grandes quantidades de documentos brutos em alta frequência. No entanto, talvez não seja necessário indexar todo o conteúdo desses documentos. Por exemplo, os usuários podem precisar pesquisar por uma ID de transação conhecido, ou por um nome de cliente em uma determinada data, para recuperar um conjunto de documentos que seja interessante para eles.

Fluxo de dados

O conceito por trás dessa arquitetura consiste em dissociar os metadados úteis para pesquisar de dados simples:

  • Somente metadados são indexados em um serviço consultável (como o Spark), enquanto os dados reais são armazenados em um data lake.
  • Os documentos brutos em um data lake são vinculados a metadados indexados por seu caminho.
  • Ao consultar documentos, o serviço pesquisa os metadados dos documentos e, por sua vez, os documentos reais serão recuperados do data lake por seu caminho.

Essa solução reduz drasticamente os custos e aumenta o desempenho, pois os metadados compreendem uma fração de todo o estado de dados (por exemplo, petabytes de documentos brutos podem ser descritos por dezenas de gigabytes de metadados concisos).

Além disso, gerenciar a combinação de profundidade histórica e requisitos em tempo real em um sistema uniforme, fácil de manter e de alto desempenho é um desafio típico desse tipo de cenário. A arquitetura do Delta Lake responde a esse desafio.

Componentes

O Serviço de Aplicativo do Azure é uma PaaS (plataforma como serviço) usada para compilar e hospedar aplicativos em máquinas virtuais gerenciadas. O Serviço de Aplicativo gerencia a infraestrutura de computação subjacente na qual seus aplicativos são executados e fornece monitoramento de cotas de uso de recursos e métricas de aplicativos, registro de informações de diagnóstico e alertas com base em métricas.

O Azure Data Factory é um serviço de extração, transformação e carregamento (ETL) na nuvem do Azure para expansão de integração e transformação de dados sem servidor. Ele oferece uma interface do usuário livre de código para criação intuitiva e gerenciamento e monitoramento em painel único. Você também pode migrar por lift-and-shift pacotes existentes do SSIS (SQL Server Integration Services) para o Azure e executá-los com total compatibilidade no Azure Data Factory.

O Azure Data Lake Storage Gen2 é um conjunto de funcionalidades dedicadas à análise de Big Data, criado no Armazenamento de Blobs do Azure. O Data Lake Storage Gen2 converge as funcionalidades do Azure Data Lake Storage Gen1 com o Armazenamento de Blobs do Azure. Por exemplo, o Data Lake Storage Gen2 fornece semântica do sistema de arquivos, segurança em nível de arquivo e escala. Como essas funcionalidades são criadas no Armazenamento de Blobs, você também obtém um armazenamento em camadas de baixo custo, com funcionalidades de alta disponibilidade/recuperação de desastre.

Os Hubs de Eventos do Azure são um serviço de ingestão de dados em tempo real totalmente gerenciado, que é simples, confiável e escalonável. Transmita milhões de eventos por segundo de qualquer fonte para criar pipelines de dados dinâmicos e responder imediatamente a desafios de negócios.

O Azure Databricks é uma plataforma de análise de dados baseada no Apache Spark otimizada para a plataforma de Serviços de Nuvem do Microsoft Azure. O Azure Databricks oferece três ambientes para o desenvolvimento de aplicativos com uso intensivo de dados: Databricks SQL, Databricks Data Science & Engineering e Databricks Machine Learning.

Alternativas

Como alternativa à indexação apenas de metadados, você pode indexar todos os dados brutos em um serviço que ofereça recursos de consulta, como o Azure Databricks, o Azure Synapse Analytics, a Pesquisa Cognitiva do Azure ou o Azure Data Explorer. Essa abordagem é mais imediata, mas preste atenção ao efeito combinado do tamanho dos dados, dos requisitos de desempenho e da frequência de atualização, especialmente do ponto de vista de custos.

Ao contrário do uso de um delta lake, o uso de uma arquitetura Lambda mantém os dados em tempo real em um repositório diferente dos dados históricos, e seu cliente executa a lógica para tornar as consultas heterogêneas transparentes para o usuário. A vantagem dessa solução é o conjunto maior de serviços que você pode usar (como o Azure Stream Analytics e o Banco de Dados SQL do Azure), mas a arquitetura se torna mais complexa e a base de código mais cara de manter.

O Spark é distribuído com o Azure Databricks, o Azure Synapse Analytics e o Azure HDInsight. Portanto, essa arquitetura poderia ser implementada com qualquer um desses serviços de dados do Azure, de preferência com uma versão recente do Spark com suporte ao Delta Lake 0.8 ou 1.0.

Detalhes do cenário

A visibilidade de dados brutos em cenários de reserva de viagens e lazer é importante para vários atores. As equipes de suporte técnico supervisionam diagnósticos em tempo real para monitorar continuamente o processamento de transações e reagir rapidamente a problemas indesejados. Os engenheiros de dados supervisionam a exportação de dados para revisão das partes interessadas e para alimentar análises em tempo real. As equipes de suporte ao cliente exigem dados históricos e recentes para lidar com consultas e reclamações de clientes. Por fim, as equipes jurídicas garantem que os deveres de conformidade sejam respeitados e as ações legais executadas. Esses tipos de requisitos são típicos em marketplaces que agregam provedores externos e gerenciam compras de usuários. Por exemplo, os sistemas de reserva de lazer e viagens desintermediam usuários e prestadores de serviços para pesquisar serviços, agregar ofertas significativas de fornecedores e gerenciar reservas de usuários.

Diagrama de um marketplace com prestadores de serviços e usuários B2B e B2C.

Possíveis casos de uso

Esta arquitetura é ideal para os setores de viagens e hospitalidade. É aplicável aos seguintes cenários:

  • Recuperação rápida de documentos brutos em tempo real (por exemplo, para diagnósticos) ou históricos (para conformidade) em seu formato original.
  • Gerenciamento de petabytes de dados.
  • Garantia de desempenho de alcance de segundos para diagnósticos em tempo real.
  • Obtenção de uma abordagem unificada para diagnósticos em tempo real, consultas históricas e análises de alimentação.
  • Alimentação de análises downstream em tempo real.
  • Controle dos custos.
  • Fornecimento de dados como documentos brutos (por exemplo, como arquivos json, xml ou csv).
  • Quando uma fração de dados é suficiente para descrever consultas.
  • Quando os usuários desejam recuperar documentos brutos completos.
  • Quando o tamanho total dos dados exigiria dimensionar o sistema acima do preço-alvo.

A arquitetura pode não ser adequada nestes casos:

  • Os dados são fornecidos como conjuntos de registros.
  • Os usuários são obrigados a executar análises.
  • Os usuários estão dispostos a usar sua própria ferramenta de BI empacotada.
  • O tamanho dos dados não é um desafio do ponto de vista de custo.

Documentos brutos não são necessariamente obrigatórios.

Considerações

Estas considerações implementam os pilares do Azure Well-Architected Framework, que é um conjunto de princípios de orientação que podem ser usados para aprimorar a qualidade de uma carga de trabalho. Para obter mais informações, consulte Microsoft Azure Well-Architected Framework.

Eficiência de desempenho

A eficiência do desempenho é a capacidade de dimensionar sua carga de trabalho para atender às demandas colocadas por usuários de maneira eficiente. Para obter mais informações, consulte Visão geral do pilar de eficiência de desempenho.

Os usuários executarão um salto duplo para acessar os dados. Eles consultarão os metadados primeiro e, em seguida, recuperarão o conjunto desejado de documentos. Pode ser difícil reutilizar ativos de clientes existentes ou empacotados.

O Azure Data Lake Storage Gen2 fornece três camadas de acesso: frequente, esporádica e de arquivos. Em cenários em que os documentos são ocasionalmente recuperados, a camada de desempenho legal deve garantir desempenho semelhante à camada de desempenho ativo, mas com a vantagem de custos mais baixos. Em cenários em que a probabilidade de recuperação é maior com dados mais recentes, considere misturar as camadas esporádica e frequente. O uso do armazenamento em camadas de arquivamento também pode fornecer uma alternativa à exclusão rígida, bem como reduzir o tamanho dos dados, mantendo apenas informações significativas ou dados mais agregados.

O data lake potencialmente gerenciará petabytes de dados, portanto, as políticas de retenção de dados geralmente se aplicam. As soluções de governança de dados devem ser empregadas para gerenciar o ciclo de vida dos dados, como quando mover dados antigos entre níveis de armazenamento quente e frio, quando excluir ou arquivar dados antigos e quando agregar informações em uma solução de análise downstream.

Considere como essa abordagem pode funcionar com cenários de análise downstream. Embora este exemplo de carga de trabalho não se destine a análises, ele é apropriado para alimentar análises em tempo real downstream, enquanto cenários em lote podem ser alimentados desde o data lake.

Escalabilidade

Os Hubs de Eventos do Azure são altamente versáteis quando se trata de desacoplar um sistema transacional que gera documentos brutos de um sistema de diagnóstico e conformidade; é fácil de implementar em arquiteturas já estabelecidas; e, em última análise, é fácil de usar. No entanto, o sistema transacional já pode usar o padrão de streaming para processar documentos de entrada. Nesse caso, você provavelmente precisaria integrar a lógica para gerenciar diagnósticos e conformidade no aplicativo de streaming como um subfluxo.

DevOps

Para implantar os serviços usados nesta carga de trabalho de exemplo automaticamente, é melhor usar processos CI/CD (integração contínua e entrega contínua). Considere usar uma solução como o Azure DevOps ou o GitHub Actions.

Otimização de custo

A otimização de custos é a análise de maneiras de reduzir as despesas desnecessárias e melhorar a eficiência operacional. Para obter mais informações, consulte Visão geral do pilar de otimização de custo.

Em geral, use a calculadora de preços do Azure para estimar os custos. Consulte a seção de custo em Microsoft Azure Well-Architected Framework para saber mais sobre outras considerações.

Implantar este cenário

Na arquitetura de exemplo a seguir, presumimos que um ou mais namespaces dos Hubs de Eventos do Azure conterão documentos brutos estruturados (como arquivos json ou xml). No entanto, o tipo e o formato reais de documentos e serviços de origem, e seu tipo de integração, são altamente dependentes do cenário e da arquitetura específicos.

Streaming

Com o Spark Structured Streaming, os dados brutos são extraídos, descompactados, analisados e convertidos em dados tabulares em um DataFrame de streaming.

O seguinte trecho de código do PySpark é usado para carregar um DataFrame de streaming dos Hubs de Eventos:

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

O trecho de código a seguir é usado para processar o DataFrame de streaming. Primeiro, ele descompacta a mensagem dos Hubs de Eventos, se necessário, e analisa sua estrutura json para um formato tabular. Este código é um exemplo e deve ser adaptado ao seu cenário específico:

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

O processamento de dados reais consiste em duas etapas. A primeira é extrair metadados para auxiliar na busca dos documentos brutos após o processamento. Os metadados reais dependem do caso de uso, mas exemplos generalizáveis seriam datas e identificadores relevantes, tipos de documento, serviço de origem e qualquer tipo de categoria:

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

A segunda etapa de processamento é gerar um caminho para o Azure Data Lake Storage Gen2, onde você armazenará documentos brutos:

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

Ingestão de metadados em um delta lake

Os metadados são gravados em uma tabela delta que habilita recursos de consulta em tempo real. As gravações são transmitidas em um buffer e as consultas à tabela podem mesclar os resultados do buffer com os da parte histórica da tabela.

O trecho de código a seguir mostra como definir uma tabela delta no metastore e particioná-la por data:

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

Observe que o campo transactionId é numérico. Mensagens típicas que passam por sistemas distribuídos podem usar GUIDs para identificar transações exclusivamente. No entanto, os tipos de dados numéricos permitem maior desempenho de consulta na maioria das plataformas de dados.

A atribuição de um identificador de transação exclusivo pode ser um desafio, dada a natureza distribuída das plataformas de dados em nuvem (como o Spark). Uma abordagem útil é basear esse identificador de transação em um identificador de partição (como o número de partição dos Hubs de Eventos) e em um número incremental dentro da partição. Um exemplo dessa abordagem é monotonically_increasing_id() no Azure Databricks.

O trecho de código a seguir mostra como anexar o fluxo com metadados de documentos brutos à tabela delta:

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

Observe que o particionamento é gerenciado ao gravar o fluxo de acordo com o esquema da tabela.

Ingestão de dados em um data lake

Os documentos brutos reais são gravados em uma camada de desempenho de armazenamento apropriada no Azure Data Lake Gen2.

O trecho de código a seguir mostra uma função simples para carregar um arquivo no Azure Data Lake Store Gen2; usando um método foreach na classe DataStreamWriter permite que você carregue o arquivo hospedado em cada registro do DataFrame de streaming:

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

Cliente

O cliente pode ser um aplicativo Web personalizado que usa metadados para recuperar caminhos de documento da tabela delta com instruções SQL padrão e, por sua vez, o documento real do data lake com APIs padrão do Azure Data Lake Storage Gen2.

O trecho de código a seguir, por exemplo, mostra como recuperar os caminhos de todos os documentos em uma determinada transação:

select * from metadata where transactionId = '123456'

Próximas etapas

Consulte as diretrizes de arquitetura relacionadas:

Consulte estas arquiteturas relacionadas: