Partilhar via


Tutorial: Construir um pipeline ETL com Lakeflow Declarative Pipelines

Saiba como criar e implantar um pipeline ETL (extrair, transformar e carregar) para orquestração de dados usando Lakeflow Declarative Pipelines e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de registos duplicados, e escrever os dados em um sistema de destino, como um data warehouse ou um data lake.

Neste tutorial, você usará Lakeflow Declarative Pipelines e Auto Loader para:

  • Ingerir dados brutos de origem em uma tabela de destino.
  • Transforme os dados brutos de origem e grave os dados transformados em duas visões materializadas de destino.
  • Consulte os dados transformados.
  • Automatize o pipeline de ETL com uma tarefa Databricks.

Para obter mais informações sobre Lakeflow Declarative Pipelines e Auto Loader, consulte Lakeflow Declarative Pipelines e What is Auto Loader?

Requisitos

Para concluir este tutorial, você deve atender aos seguintes requisitos:

Sobre o conjunto de dados

O conjunto de dados usado neste exemplo é um subconjunto do Million Song Dataset, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de exemplo incluídos em seu espaço de trabalho do Azure Databricks.

Etapa 1: Criar um pipeline

Primeiro, você criará um pipeline ETL no Lakeflow Declarative Pipelines. O Lakeflow Declarative Pipelines cria pipelines resolvendo dependências definidas em blocos de anotações ou arquivos (chamados de código-fonte) usando a sintaxe Lakeflow Declarative Pipelines. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários blocos de anotações ou arquivos específicos do idioma no pipeline. Para saber mais, consulte Lakeflow Declarative Pipelines

Importante

Deixe o campo Código-fonte em branco para criar e configurar um bloco de anotações para criação automática do código-fonte.

Este tutorial usa computação sem servidor e Unity Catalog. Para todas as opções de configuração que não são especificadas, use as configurações padrão. Se a computação sem servidor não estiver habilitada ou suportada em seu espaço de trabalho, você poderá concluir o tutorial conforme escrito usando as configurações de computação padrão. Se você usar as configurações de computação padrão, deverá selecionar manualmente Unity Catalog em Opções de armazenamento na seção Destino da interface do usuário Criar pipeline .

Para criar um novo pipeline ETL em Lakeflow Declarative Pipelines, siga estas etapas:

  1. No espaço de trabalho, clique no ícone Fluxos de trabalho.Jobs & Pipelines na barra lateral.
  2. Em Novo, clique em Pipeline ETL.
  3. Em Nome do pipeline, digite um nome de pipeline exclusivo.
  4. Marque a caixa de seleção Sem servidor .
  5. Em Destino, para configurar um local do Catálogo Unity onde as tabelas são publicadas, selecione um Catálogo existente e escreva um novo nome em Esquema para criar um novo esquema em seu catálogo.
  6. Clique em Criar.

A interface de utilizador do pipeline aparece para o novo pipeline.

Etapa 2: Desenvolver um pipeline

Importante

Os blocos de notas só podem conter uma única linguagem de programação. Não misture código Python e SQL em cadernos de código fonte de pipeline.

Nesta etapa, você usará o Databricks Notebooks para desenvolver e validar o código-fonte para Lakeflow Declarative Pipelines interativamente.

O código usa o Auto Loader para ingestão incremental de dados. O Auto Loader deteta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem. Para saber mais, consulte O que é o Auto Loader?

Um notebook de código-fonte em branco é criado e configurado automaticamente para a pipeline. O bloco de anotações é criado em um novo diretório no diretório do usuário. O nome do novo diretório e arquivo correspondem ao nome do seu pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline.

Ao desenvolver um pipeline, você pode escolher Python ou SQL. Estão incluídos exemplos para ambas as línguas. Com base na sua escolha de idioma, verifique se você seleciona o idioma padrão do bloco de anotações. Para saber mais sobre o suporte de notebook para desenvolvimento de código Lakeflow Declarative Pipelines, consulte Desenvolver e depurar pipelines ETL com um notebook em Lakeflow Declarative Pipelines.

  1. Um link para aceder a este notebook encontra-se no campo Código-fonte no painel Detalhes do pipeline. Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.

  2. Clique em Connect no canto superior direito para abrir o menu de configuração de computação.

  3. Passe o cursor sobre o nome do pipeline criado na Etapa 1.

  4. Clique em Conectar.

  5. Ao lado do título do bloco de anotações na parte superior, selecione a linguagem padrão do bloco de anotações (Python ou SQL).

  6. Copie e cole o código a seguir em uma célula do bloco de anotações.

    Píton

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    
  7. Clique em Iniciar para iniciar uma atualização para o pipeline conectado.

Etapa 3: Consultar os dados transformados

Nesta etapa, você consultará os dados processados no pipeline ETL para analisar os dados da música. Essas consultas usam os registros preparados criados na etapa anterior.

Primeiro, execute uma consulta que encontre os artistas que lançaram mais músicas a cada ano desde 1990.

  1. Na barra lateral, clique em Ícone do Editor SQLEditor SQL.

  2. Clique no ícone Adicionar ou ícone de mais da nova guia e selecione Criar nova consulta no menu.

  3. Insira o seguinte:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Substitua <catalog> e <schema> pelo nome do catálogo e esquema em que a tabela está. Por exemplo, data_pipelines.songs_data.top_artists_by_year.

  4. Clique em Executar seleção.

Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançável.

  1. Clique no ícone Adicionar ou mais , novo ícone de toque e selecione Criar nova consulta no menu.

  2. Insira o seguinte código:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Substitua <catalog> e <schema> pelo nome do catálogo e esquema em que a tabela está. Por exemplo, data_pipelines.songs_data.songs_prepared.

  3. Clique em Executar seleção.

Etapa 4: Criar um trabalho para executar o pipeline

Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão, processamento e análise de dados usando um trabalho Databricks.

  1. No espaço de trabalho, clique no ícone Fluxos de trabalho.Jobs & Pipelines na barra lateral.
  2. Em Novo, clique em Trabalho.
  3. Na caixa de título da tarefa, substitua a data e a hora< do Novo Trabalho > pelo nome do trabalho. Por exemplo, Songs workflow.
  4. Em Nome da tarefa, insira um nome para a primeira tarefa, por exemplo, ETL_songs_data.
  5. Em Tipo, selecione Pipeline.
  6. Em Pipeline, selecione o pipeline criado na etapa 1.
  7. Clique em Criar.
  8. Para executar o fluxo de trabalho, clique em Executar Agora. Para exibir os detalhes da execução, clique na guia Execuções . Clique na tarefa para ver os detalhes da tarefa executada.
  9. Para exibir os resultados quando o fluxo de trabalho for concluído, clique em Ir para a última execução bem-sucedida ou a hora de início da execução do trabalho. A página Saída é exibida e exibe os resultados da consulta.

Consulte Monitorização e observabilidade de tarefas do Lakeflow para obter mais informações sobre execuções de tarefas.

Etapa 5: Agendar o trabalho de pipeline

Para executar o pipeline ETL em um cronograma, execute estas etapas:

  1. Navegue até a interface do usuário Jobs & Pipelines no mesmo espaço de trabalho do Azure Databricks que o trabalho.
  2. Opcionalmente, selecione os filtros Trabalhos e Propriedade de mim .
  3. Na coluna Nome , clique no nome do trabalho. O painel lateral exibe os detalhes do trabalho.
  4. Clique em Adicionar gatilho no painel Agendas & Gatilhos e selecione Agendado no tipo de gatilho.
  5. Especifique o período, a hora de início e o fuso horário.
  6. Clique em Salvar.

Mais informações