Partilhar via


Tutorial: Construir um pipeline de ETL com os Pipelines Declarativos do Lakeflow Spark

Este tutorial explica como criar e implantar um pipeline ETL (extrair, transformar e carregar) para orquestração de dados usando Lakeflow Spark Declarative Pipelines e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de registos duplicados, e escrever os dados em um sistema de destino, como um data warehouse ou um data lake.

Neste tutorial, você usará pipelines e Auto Loader para:

  • Ingerir dados brutos de origem em uma tabela de destino.
  • Transforme os dados brutos de origem e grave os dados transformados em duas visões materializadas de destino.
  • Consulte os dados transformados.
  • Automatize o pipeline de ETL com uma tarefa Databricks.

Para obter mais informações sobre pipelines e Auto Loader, consulte Lakeflow Spark Declarative Pipelines e O que é Auto Loader?

Requisitos

Para concluir este tutorial, você deve atender aos seguintes requisitos:

Sobre o conjunto de dados

O conjunto de dados usado neste exemplo é um subconjunto do Million Song Dataset, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de exemplo incluídos em seu espaço de trabalho do Azure Databricks.

Etapa 1: Criar um pipeline

Primeiro, crie um pipeline definindo os conjuntos de dados em arquivos (chamados de código-fonte) usando a sintaxe do pipeline. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários arquivos específicos de idioma no pipeline. Para saber mais, consulte Lakeflow Spark Declarative Pipelines

Este tutorial usa computação sem servidor e Unity Catalog. Para todas as opções de configuração que não são especificadas, use as configurações padrão. Se a computação sem servidor não estiver habilitada ou suportada em seu espaço de trabalho, você poderá concluir o tutorial conforme escrito usando as configurações de computação padrão.

Para criar um novo pipeline, siga estas etapas:

  1. No espaço de trabalho, clique no ícone de adição.Novo na barra lateral e, em seguida, selecione Pipeline ETL.
  2. Dê ao seu pipeline um nome exclusivo.
  3. Logo abaixo do nome, selecione o catálogo e o esquema padrão para os dados gerados. Você pode especificar outros destinos em suas transformações, mas este tutorial usa esses padrões. Você deve ter permissões para o catálogo e o esquema criados. Consulte Requisitos.
  4. Para este tutorial, selecione Iniciar com um arquivo vazio.
  5. Em Caminho da pasta, especifique um local para os arquivos de origem ou aceite o padrão (sua pasta de usuário).
  6. Escolha Python ou SQL como a linguagem para o seu primeiro arquivo de origem (um pipeline pode combinar linguagens, mas cada arquivo deve estar em uma única linguagem).
  7. Clique em Selecionar.

O editor de pipeline aparece para criar o novo pipeline. Um arquivo de origem vazio para seu idioma é criado, pronto para sua primeira transformação.

Etapa 2: Desenvolver a lógica do pipeline

Nesta etapa, você usará o Lakeflow Pipelines Editor para desenvolver e validar o código-fonte do pipeline interativamente.

O código usa o Auto Loader para ingestão incremental de dados. O Auto Loader deteta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem. Para saber mais, consulte O que é o Auto Loader?

Um arquivo de código-fonte em branco é criado e configurado automaticamente para o pipeline. O arquivo é criado na pasta de transformações do seu pipeline. Por padrão, todos os arquivos *.py e *.sql na pasta de transformações fazem parte da origem do seu pipeline.

  1. Copie e cole o código a seguir em seu arquivo de origem. Certifique-se de usar o idioma selecionado para o arquivo na Etapa 1.

    Python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Esta fonte inclui código para três consultas. Você também pode colocar essas consultas em arquivos separados, para organizar os arquivos e codificar da maneira que preferir.

  2. Clique no ícone Reproduzir.Executar arquivo ou Executar pipeline para iniciar uma atualização para o pipeline conectado. Com apenas um arquivo de origem em seu pipeline, eles são funcionalmente equivalentes.

Quando a atualização for concluída, o editor será atualizado com informações sobre o seu pipeline.

  • O gráfico de pipeline (DAG), na barra lateral à direita do código, mostra três tabelas, songs_raw, songs_preparede top_artists_by_year.
  • Um resumo da atualização é mostrado na parte superior do navegador de ativos de pipeline.
  • Os detalhes das tabelas que foram geradas são mostrados no painel inferior, e você pode procurar dados das tabelas selecionando uma.

Isso inclui os dados brutos e limpos, bem como algumas análises simples para encontrar os melhores artistas por ano. Na próxima etapa, você cria consultas ad-hoc para análise adicional em um arquivo separado em seu pipeline.

Etapa 3: Explore os conjuntos de dados criados pelo seu pipeline

Nesta etapa, você executa consultas ad-hoc nos dados processados no pipeline ETL para analisar os dados da música no Editor SQL do Databricks. Essas consultas usam os registros preparados criados na etapa anterior.

Primeiro, execute uma consulta que encontre os artistas que lançaram mais músicas a cada ano desde 1990.

  1. Na barra lateral do navegador de ativos de pipeline, clique no ícone de adição.Adicionar e depois Exploração.

  2. Insira um Nome e selecione SQL para o arquivo de exploração. Um bloco de anotações SQL é criado em uma nova explorations pasta. Os arquivos na explorations pasta não são executados como parte de uma atualização de pipeline por padrão. O bloco de anotações SQL tem células que podem ser executadas juntas ou separadamente.

  3. Para criar uma tabela de artistas que lançam mais músicas em cada ano após 1990, insira o seguinte código no novo arquivo SQL (se houver um código de exemplo no arquivo, substitua-o). Como esse bloco de anotações não faz parte do pipeline, ele não usa o catálogo e o esquema padrão. Substitua o <catalog>.<schema> pelo catálogo e esquema que você usou como padrão para o pipeline:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Clique no ícone Reproduzir ou pressione Shift + Enter para executar esta consulta.

Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançável.

  1. Adicione o seguinte código à próxima célula no mesmo ficheiro. Novamente, substitua o <catalog>.<schema> pelo catálogo e esquema que você usou como padrão para o pipeline:

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Clique no ícone Reproduzir ou pressione Shift + Enter para executar esta consulta.

Etapa 4: Criar um trabalho para executar o pipeline

Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão, processamento e análise de dados usando um trabalho Databricks executado em uma agenda.

  1. Na parte superior do editor, escolha o botão Agendar .
  2. Se a caixa de diálogo Agendas for exibida, escolha Adicionar agenda.
  3. Isso abre a caixa de diálogo Nova agenda, onde pode criar um trabalho para executar o pipeline num cronograma.
  4. Opcionalmente, dê um nome ao trabalho.
  5. Por padrão, a programação é definida para ser executada uma vez por dia. Você pode aceitar este padrão ou definir um horário à sua escolha. Escolher Avançado dá-lhe a opção de definir uma hora específica em que o trabalho será executado. Selecionar Mais opções permite criar notificações quando o trabalho é executado.
  6. Selecione Criar para aplicar as alterações e criar o trabalho.

Agora, a tarefa será executada diariamente para manter o seu pipeline atualizado. Você pode escolher Agendar novamente para exibir a lista de agendas. Você pode gerenciar agendas para seu pipeline a partir dessa caixa de diálogo, incluindo adicionar, editar ou remover agendas.

Clicar no nome da agenda (ou trabalho) leva você à página do trabalho na lista Jobs & pipelines . A partir daí, você pode visualizar detalhes sobre execuções de trabalho, incluindo o histórico de execuções, ou executar o trabalho imediatamente com o botão Executar agora .

Consulte Monitorização e observabilidade de tarefas do Lakeflow para obter mais informações sobre execuções de tarefas.

Mais informações