Partilhar via


Tutorial: Executar um pipeline de análise de lakehouse de ponta a ponta

Este tutorial mostra como configurar um pipeline de análise de ponta a ponta para um lago do Azure Databricks.

Importante

Este tutorial usa blocos de anotações interativos para concluir tarefas comuns de ETL em Python em clusters habilitados para Unity Catalog. Se você não estiver usando o Unity Catalog, consulte Executar sua primeira carga de trabalho ETL no Azure Databricks.

Tarefas neste tutorial

Ao final deste artigo, você se sentirá confortável:

  1. Iniciar um cluster de computação habilitado para o Unity Catalog.
  2. Criando um bloco de anotações Databricks.
  3. Gravação e leitura de dados de um local externo do Catálogo Unity.
  4. Configuração da ingestão incremental de dados para uma tabela do Unity Catalog com o Auto Loader.
  5. Executar células do bloco de notas para processar, consultar e pré-visualizar dados.
  6. Agendando um bloco de anotações como um trabalho do Databricks.
  7. Consultando tabelas do Unity Catalog a partir do Databricks SQL

O Azure Databricks fornece um conjunto de ferramentas prontas para produção que permitem que os profissionais de dados desenvolvam e implantem rapidamente pipelines de extração, transformação e carregamento (ETL). O Unity Catalog permite que os administradores de dados configurem e protejam credenciais de armazenamento, locais externos e objetos de banco de dados para usuários em toda a organização. O Databricks SQL permite que os analistas executem consultas SQL nas mesmas tabelas usadas em cargas de trabalho ETL de produção, permitindo business intelligence em tempo real em escala.

Você também pode usar Delta Live Tables para criar pipelines ETL. A Databricks criou Delta Live Tables para reduzir a complexidade da construção, implantação e manutenção de pipelines ETL de produção. Consulte Tutorial: Execute seu primeiro pipeline Delta Live Tables.

Requisitos

Nota

Se você não tiver privilégios de controle de cluster, ainda poderá concluir a maioria das etapas abaixo, desde que tenha acesso a um cluster.

Etapa 1: Criar um cluster

Para fazer análise de dados exploratória e engenharia de dados, crie um cluster para fornecer os recursos de computação necessários para executar comandos.

  1. Clique em ícone de computação Calcular na barra lateral.
  2. Clique Novo ícone em Novo na barra lateral e selecione Cluster. Isso abre a página Novo Cluster/Computação.
  3. Especifique um nome exclusivo para o cluster.
  4. Selecione o botão de opção Nó único.
  5. Selecione Usuário único na lista suspensa Modo de acesso .
  6. Certifique-se de que o seu endereço de e-mail está visível no campo Utilizador único.
  7. Selecione a versão de tempo de execução do Databricks desejada, 11.1 ou superior para usar o Unity Catalog.
  8. Clique em Criar computação para criar o cluster.

Para saber mais sobre clusters Databricks, consulte Computação.

Etapa 2: Criar um bloco de anotações Databricks

Para criar um bloco de notas na sua área de trabalho, clique Novo ícone em Novo na barra lateral e, em seguida, clique em Bloco de Notas. Um bloco de anotações em branco é aberto no espaço de trabalho.

Para saber mais sobre como criar e gerir blocos de notas, consulte Gerir blocos de notas.

Etapa 3: Gravar e ler dados de um local externo gerenciado pelo Unity Catalog

O Databricks recomenda o uso do Auto Loader para ingestão incremental de dados. O Auto Loader deteta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.

Use o Unity Catalog para gerenciar o acesso seguro a locais externos. Usuários ou entidades de serviço com READ FILES permissões em um local externo podem usar o Auto Loader para ingerir dados.

Normalmente, os dados chegarão em um local externo devido a gravações de outros sistemas. Nesta demonstração, você pode simular a chegada de dados gravando arquivos JSON em um local externo.

Copie o código abaixo para uma célula do bloco de anotações. Substitua o valor da cadeia de caracteres para catalog pelo nome de um catálogo com CREATE CATALOG e USE CATALOG permissões. Substitua o valor da cadeia de caracteres para external_location pelo caminho de um local externo por READ FILES, WRITE FILESe CREATE EXTERNAL TABLE permissões.

Os locais externos podem ser definidos como um contêiner de armazenamento inteiro, mas geralmente apontam para um diretório aninhado em um contêiner.

O formato correto para um caminho de local externo é "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

Executando esta célula deve imprimir uma linha que lê 12 bytes, imprimir a string "Olá mundo!", e exibir todos os bancos de dados presentes no catálogo fornecido. Se você não conseguir executar essa célula, confirme se você está em um espaço de trabalho habilitado para Unity Catalog e solicite as permissões adequadas do administrador do espaço de trabalho para concluir este tutorial.

O código Python abaixo usa seu endereço de e-mail para criar um banco de dados exclusivo no catálogo fornecido e um local de armazenamento exclusivo no local externo fornecido. A execução desta célula removerá todos os dados associados a este tutorial, permitindo que você execute este exemplo idempotente. Uma classe é definida e instanciada que você usará para simular lotes de dados que chegam de um sistema conectado ao seu local externo de origem.

Copie esse código para uma nova célula em seu bloco de anotações e execute-o para configurar seu ambiente.

Nota

As variáveis definidas neste código devem permitir que você o execute com segurança sem risco de conflito com ativos de espaço de trabalho existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código; Entre em contato com o administrador do espaço de trabalho para solucionar essas restrições.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Agora você pode obter um lote de dados copiando o código a seguir em uma célula e executando-o. Você pode executar manualmente essa célula até 60 vezes para acionar a chegada de novos dados.

RawData.land_batch()

Etapa 4: Configurar o Auto Loader para ingerir dados no Unity Catalog

A Databricks recomenda o armazenamento de dados com o Delta Lake. Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e habilita o data lakehouse. Delta Lake é o formato padrão para tabelas criadas no Databricks.

Para configurar o Auto Loader para ingerir dados em uma tabela do Unity Catalog, copie e cole o seguinte código em uma célula vazia no seu bloco de anotações:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?.

Para saber mais sobre o Streaming estruturado com o Unity Catalog, consulte Usando o catálogo Unity com o Structured Streaming.

Etapa 5: Processar e interagir com os dados

Os blocos de anotações executam lógica célula a célula. Use estas etapas para executar a lógica em sua célula:

  1. Para executar a célula concluída na etapa anterior, selecione-a e pressione SHIFT+ENTER.

  2. Para consultar a tabela que acabou de criar, copie e cole o código seguinte numa célula vazia e, em seguida, prima SHIFT+ENTER para executar a célula.

    df = spark.read.table(table)
    
  3. Para visualizar os dados em seu DataFrame, copie e cole o código a seguir em uma célula vazia e pressione SHIFT+ENTER para executar a célula.

    display(df)
    

Para saber mais sobre as opções interativas para visualizar dados, consulte Visualizações em blocos de anotações Databricks.

Etapa 6: Agendar um trabalho

Você pode executar blocos de anotações Databricks como scripts de produção adicionando-os como uma tarefa em um trabalho Databricks. Nesta etapa, você criará um novo trabalho que poderá ser acionado manualmente.

Para agendar o seu bloco de notas como uma tarefa:

  1. Clique em Agendar no lado direito da barra de cabeçalho.
  2. Insira um nome exclusivo para o nome do trabalho.
  3. Clique em Manual.
  4. Na lista suspensa Cluster, selecione o cluster criado na etapa 1.
  5. Clique em Criar.
  6. Na janela apresentada, clique em Executar agora.
  7. Para ver os resultados da execução do trabalho, clique no Link externo ícone ao lado do carimbo de data/hora da última execução .

Para obter mais informações sobre trabalhos, consulte O que são trabalhos Databricks?.

Etapa 7: Consultar tabela do Databricks SQL

Qualquer pessoa com a USE CATALOG permissão no catálogo atual, a USE SCHEMA permissão no esquema atual e SELECT as permissões na tabela pode consultar o conteúdo da tabela a partir de sua API Databricks preferida.

Você precisa acessar um SQL warehouse em execução para executar consultas no Databricks SQL.

A tabela que você criou anteriormente neste tutorial tem o nome target_table. Você pode consultá-lo usando o catálogo que você forneceu na primeira célula e o banco de dados com o patern e2e_lakehouse_<your-username>. Você pode usar o Catalog Explorer para localizar os objetos de dados que você criou.

Integrações adicionais

Saiba mais sobre integrações e ferramentas para engenharia de dados com o Azure Databricks: