Ler em inglês

Compartilhar via


Tutorial: Criar um pipeline de ETL com DLT

Saiba como criar e implantar um pipeline de ETL (extrair, transformar e carregar) para orquestração de dados usando DLT e Carregador Automático. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.

Neste tutorial, você usará o DLT e o Carregador Automático para:

  • Ingira os dados brutos de origem em uma tabela de destino.
  • Transforme os dados brutos de origem e escreva os dados transformados em duas visões materializadas de destino.
  • Consulte os dados transformados.
  • Automatize o pipeline de ETL com um trabalho do Databricks.

Para obter mais informações sobre DLT e Carregador Automático, consulte DLT e O que é o Carregador Automático?

Requisitos

Para concluir este tutorial, você deve atender aos seguintes requisitos:

Sobre o conjunto de dados

O conjunto de dados usado neste exemplo é um subconjunto do Conjunto de dados de milhões de músicas, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de amostra incluídos no workspace do Azure Databricks.

Etapa 1: Criar um pipeline

Primeiro, você criará um pipeline de ETL no DLT. O DLT cria pipelines resolvendo dependências definidas em notebooks ou arquivos (chamados código-fonte) usando a sintaxe DLT. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários notebooks ou arquivos específicos do idioma no pipeline. Para saber mais, consulte DLT

Importante

Deixe o campo Código-fonte em branco para criar e configurar automaticamente um notebook para criação de código-fonte.

Este tutorial usa a computação sem servidor e o Catálogo do Unity. Para todas as opções de configuração não especificadas, use as configurações padrão. Se a computação sem servidor não estiver habilitada ou tiver suporte em seu workspace, você poderá concluir o tutorial como escrito usando as configurações de computação padrão. Você deve selecionar manualmente o Catálogo do Unity em Opções de Armazenamento na seção Destino da interface do usuário Criar pipeline.

Para criar um pipeline de ETL na plataforma DLT, siga estas etapas:

  1. Na barra lateral, clique em Pipelines.
  2. Clique em Criar pipeline e Pipeline de ETL.
  3. No nome do pipeline, digite um nome de pipeline exclusivo.
  4. Marque a caixa de seleção sem servidor .
  5. No Destino, para configurar um local do Catálogo do Unity onde as tabelas são publicadas, selecione um Catálogo existente e escreva um novo nome no Esquema para criar um novo esquema em seu catálogo.
  6. Clique em Criar.
  7. Clique no link do notebook de código-fonte no campo Código-fonte no painel Detalhes do pipeline.

A interface do usuário do pipeline é exibida para o novo pipeline.

Etapa 2: Desenvolver um pipeline DLT

Importante

Os notebooks só podem conter uma única linguagem de programação. Não misture código Python e SQL em notebooks de código-fonte do pipeline.

Nesta etapa, você usará notebooks do Databricks para desenvolver e validar interativamente o código-fonte para pipelines DLT.

O código usa o Carregador Automático para ingestão de dados incremental. O Carregador Automático detecta e processa automaticamente arquivos novos confirme eles chegam no armazenamento de objeto da nuvem. Para saber mais, confira o que é o Carregador Automático?

Um notebook de código-fonte em branco é criado e configurado automaticamente para o pipeline. O bloco de anotações é criado em um novo diretório no diretório do usuário. Os nomes do novo diretório e do arquivo correspondem ao nome do pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline.

Ao desenvolver um pipeline DLT, você pode escolher Python ou SQL. Exemplos são incluídos para ambos os idiomas. Com base na sua escolha de idioma, selecione o idioma padrão do bloco de anotações. Para saber mais sobre o suporte ao notebook para desenvolvimento de código de pipeline DLT, consulte Desenvolver e depurar pipelines ETL com um notebook em DLT.

  1. Um link para acessar este notebook está no campo Código-fonte no painel Pipeline details. Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.

  2. Clique em Conectar no canto superior direito para abrir o menu de configuração de computação.

  3. Passe o mouse sobre o nome do pipeline que você criou na Etapa 1.

  4. Clique em Conectar.

  5. Ao lado do título do bloco de anotações na parte superior, selecione o idioma padrão do bloco de anotações (Python ou SQL).

  6. Copie e cole o código a seguir em uma célula no notebook.

    Python

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists each year who released most songs."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year who released most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    

Etapa 3: Consultar os dados transformados

Nesta etapa, você consultará os dados processados no pipeline de ETL para analisar os dados da música. Essas consultas usam os registros preparados criados na etapa anterior.

Primeiro, execute uma consulta que encontra os artistas que mais lançaram músicas a cada ano, começando em 1990.

  1. Na barra lateral, clique no ícone do Editor de SQLEditor de SQL.

  2. Clique no Ícone de adição ou de mais ícone de nova guia e selecione Criar consulta no menu.

  3. Insira o seguinte:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Substitua <catalog> e <schema> pelo nome do catálogo e esquema em que a tabela está. Por exemplo, data_pipelines.songs_data.top_artists_by_year.

  4. Clique em Executar seleção.

Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançante.

  1. Clique no Ícone de adição ou de mais novo ícone de toque e selecione Criar consulta no menu.

  2. Insira o seguinte código:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Substitua <catalog> e <schema> pelo nome do catálogo e esquema em que a tabela está. Por exemplo, data_pipelines.songs_data.songs_prepared.

  3. Clique em Executar selecionado.

Etapa 4: Criar um trabalho para executar o pipeline do DLT

Em seguida, crie um fluxo de trabalho para automatizar a execução das etapas de ingestão, processamento e análise de dados usando um trabalho do Databricks.

  1. No workspace, clique em Ícone de Fluxos de TrabalhoFluxos de Trabalho na barra lateral e clique em Criar tarefa.
  2. Na caixa de título da tarefa, substitua a data e a hora> do Novo Trabalho < pelo nome do trabalho. Por exemplo, Songs workflow.
  3. Em Nome da tarefa, insira um nome para a primeira tarefa, por exemplo, ETL_songs_data.
  4. Em Tipo, selecione Pipeline.
  5. No Pipeline, selecione o pipeline DLT que você criou na etapa 1.
  6. Clique em Criar.
  7. Para executar o fluxo de trabalho, clique em Executar Agora. Para exibir os detalhes da execução, clique na guia Execuções . Clique na tarefa para exibir os detalhes da execução da tarefa.
  8. Para exibir os resultados quando o fluxo de trabalho for concluído, clique em Ir para a última execução bem-sucedida ou na hora de início da execução do trabalho. A página Saída aparece e exibe os resultados da consulta.

Para obter mais informações sobre execuções de trabalho, veja Monitoramento e observabilidade de trabalhos do Databricks.

Etapa 5: Agendar o trabalho do pipeline do DLT

Para executar o pipeline de ETL em um agendamento, siga estas etapas:

  1. Clique em Ícone de fluxos de trabalhoFluxos de trabalho na barra lateral.
  2. Na coluna Nome, clique no nome do trabalho. O painel lateral exibe os Detalhes do trabalho.
  3. Clique em Adicionar gatilho no painel Agendas & Gatilhos e selecione Agendado no tipo gatilho.
  4. Especifique o período, a hora de início e o fuso horário.
  5. Clique em Save (Salvar).

Saiba mais