Share via


Crie um pipeline de dados de ponta a ponta no Databricks

Este artigo mostra como criar e implantar um pipeline de processamento de dados de ponta a ponta, incluindo como ingerir dados brutos, transformar os dados e executar análises nos dados processados.

Nota

Embora este artigo demonstre como criar um pipeline de dados completo usando blocos de anotações Databricks e um trabalho do Azure Databricks para orquestrar um fluxo de trabalho, o Databricks recomenda o uso do Delta Live Tables, uma interface declarativa para criar pipelines de processamento de dados confiáveis, sustentáveis e testáveis.

O que é um pipeline de dados?

Um pipeline de dados implementa as etapas necessárias para mover dados de sistemas de origem, transformar esses dados com base em requisitos e armazenar os dados em um sistema de destino. Um pipeline de dados inclui todos os processos necessários para transformar dados brutos em dados preparados que os usuários podem consumir. Por exemplo, um pipeline de dados pode preparar dados para que analistas de dados e cientistas de dados possam extrair valor dos dados por meio de análises e relatórios.

Um fluxo de trabalho de extração, transformação e carga (ETL) é um exemplo comum de um pipeline de dados. No processamento de ETL, os dados são ingeridos de sistemas de origem e gravados em uma área de preparação, transformados com base nos requisitos (garantindo a qualidade dos dados, desduplicando registros e assim por diante) e, em seguida, gravados em um sistema de destino, como um data warehouse ou data lake.

Etapas do pipeline de dados

Para ajudá-lo a começar a criar pipelines de dados no Azure Databricks, o exemplo incluído neste artigo descreve a criação de um fluxo de trabalho de processamento de dados:

  • Use os recursos do Azure Databricks para explorar um conjunto de dados bruto.
  • Crie um bloco de anotações Databricks para ingerir dados brutos de origem e gravar os dados brutos em uma tabela de destino.
  • Crie um bloco de anotações Databricks para transformar os dados brutos de origem e gravar os dados transformados em uma tabela de destino.
  • Crie um bloco de anotações Databricks para consultar os dados transformados.
  • Automatize o pipeline de dados com um trabalho do Azure Databricks.

Requisitos

Exemplo: conjunto de dados Million Song

O conjunto de dados usado neste exemplo é um subconjunto do Million Song Dataset, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de exemplo incluídos em seu espaço de trabalho do Azure Databricks.

Etapa 1: Criar um cluster

Para executar o processamento e a análise de dados neste exemplo, crie um cluster para fornecer os recursos de computação necessários para executar comandos.

Nota

Como este exemplo usa um conjunto de dados de exemplo armazenado no DBFS e recomenda tabelas persistentes para o Unity Catalog, você cria um cluster configurado com o modo de acesso de usuário único. O modo de acesso de usuário único fornece acesso total ao DBFS e, ao mesmo tempo, permite o acesso ao Unity Catalog. Consulte Práticas recomendadas para DBFS e Unity Catalog.

  1. Clique em Calcular na barra lateral.
  2. Na página Computação, clique em Criar Cluster.
  3. Na página Novo Cluster, insira um nome exclusivo para o cluster.
  4. No modo de acesso, selecione Usuário único.
  5. Em Acesso de usuário único ou entidade de serviço, selecione seu nome de usuário.
  6. Deixe os valores restantes em seu estado padrão e clique em Criar Cluster.

Para saber mais sobre clusters Databricks, consulte Computação.

Etapa 2: Explore os dados de origem

Para saber como usar a interface do Azure Databricks para explorar os dados de origem brutos, consulte Explorar os dados de origem para um pipeline de dados. Se você quiser ir diretamente para a ingestão e preparação dos dados, continue para a Etapa 3: Ingerir os dados brutos.

Etapa 3: Ingerir os dados brutos

Nesta etapa, você carrega os dados brutos em uma tabela para disponibilizá-los para processamento posterior. Para gerenciar ativos de dados na plataforma Databricks, como tabelas, a Databricks recomenda o Unity Catalog. No entanto, se você não tiver permissões para criar o catálogo e o esquema necessários para publicar tabelas no Unity Catalog, ainda poderá concluir as etapas a seguir publicando tabelas no metastore do Hive.

Para ingerir dados, a Databricks recomenda o uso do Auto Loader. O Auto Loader deteta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.

Você pode configurar o Auto Loader para detetar automaticamente o esquema de dados carregados, permitindo inicializar tabelas sem declarar explicitamente o esquema de dados e evoluir o esquema de tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de rastrear e aplicar manualmente as alterações de esquema ao longo do tempo. O Databricks recomenda a inferência de esquema ao usar o Auto Loader. No entanto, como visto na etapa de exploração de dados, os dados das músicas não contêm informações de cabeçalho. Como o cabeçalho não é armazenado com os dados, você precisará definir explicitamente o esquema, conforme mostrado no exemplo a seguir.

  1. Na barra lateral, clique em Novo íconeNovo e selecione Bloco de Anotações no menu. A caixa de diálogo Criar bloco de anotações é exibida.

  2. Introduza um nome para o bloco de notas, por exemplo, Ingest songs data. Por predefinição:

    • Python é a linguagem selecionada.
    • O bloco de anotações é anexado ao último cluster usado. Nesse caso, o cluster criado na Etapa 1: criar um cluster.
  3. Introduza o seguinte na primeira célula do bloco de notas:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Se você estiver usando o Unity Catalog, substitua <table-name> por um catálogo, esquema e nome de tabela para conter os registros ingeridos (por exemplo, data_pipelines.songs_data.raw_song_data). Caso contrário, substitua <table-name> pelo nome de uma tabela para conter os registros ingeridos, por exemplo, raw_song_data.

    Substitua <checkpoint-path> por um caminho para um diretório no DBFS para manter arquivos de ponto de verificação, por exemplo, /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Clique em Executar Menue selecione Executar célula. Este exemplo define o esquema de dados usando as informações do README, ingere os dados de músicas de todos os arquivos contidos em file_pathe grava os dados na tabela especificada por table_name.

Etapa 4: Preparar os dados brutos

Para preparar os dados brutos para análise, as etapas a seguir transformam os dados brutos de músicas filtrando colunas desnecessárias e adicionando um novo campo contendo um carimbo de data/hora para a criação do novo registro.

  1. Na barra lateral, clique em Novo íconeNovo e selecione Bloco de Anotações no menu. A caixa de diálogo Criar bloco de anotações é exibida.

  2. Introduza um nome para o bloco de notas. Por exemplo, Prepare songs data. Altere o idioma padrão para SQL.

  3. Introduza o seguinte na primeira célula do bloco de notas:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Se você estiver usando o Unity Catalog, substitua <table-name> por um catálogo, esquema e nome de tabela para conter os registros filtrados e transformados (por exemplo, data_pipelines.songs_data.prepared_song_data). Caso contrário, substitua <table-name> pelo nome de uma tabela para conter os registros filtrados e transformados (por exemplo, prepared_song_data).

    Substitua <raw-songs-table-name> pelo nome da tabela que contém os registros de músicas brutas ingeridas na etapa anterior.

  4. Clique em Executar Menue selecione Executar célula.

Etapa 5: Consultar os dados transformados

Nesta etapa, você estende o pipeline de processamento adicionando consultas para analisar os dados das músicas. Essas consultas usam os registros preparados criados na etapa anterior.

  1. Na barra lateral, clique em Novo íconeNovo e selecione Bloco de Anotações no menu. A caixa de diálogo Criar bloco de anotações é exibida.

  2. Introduza um nome para o bloco de notas. Por exemplo, Analyze songs data. Altere o idioma padrão para SQL.

  3. Introduza o seguinte na primeira célula do bloco de notas:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Substitua <prepared-songs-table-name> pelo nome da tabela que contém os dados preparados. Por exemplo, data_pipelines.songs_data.prepared_song_data.

  4. Clique Down Caret no menu de ações da célula, selecione Adicionar célula abaixo e digite o seguinte na nova célula:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Substitua <prepared-songs-table-name> pelo nome da tabela preparada criada na etapa anterior. Por exemplo, data_pipelines.songs_data.prepared_song_data.

  5. Para executar as consultas e visualizar a saída, clique em Executar tudo.

Etapa 6: Criar um trabalho do Azure Databricks para executar o pipeline

Você pode criar um fluxo de trabalho para automatizar a execução das etapas de ingestão, processamento e análise de dados usando um trabalho do Azure Databricks.

  1. No seu espaço de trabalho Ciência de Dados & Engenharia, siga um destes procedimentos:
    • Clique em Ícone Fluxos de TrabalhoFluxos de trabalho na barra lateral e clique em .Botão Criar Trabalho
    • Na barra lateral, clique em Novo íconeNovo e selecione Trabalho.
  2. Na caixa de diálogo da tarefa no separador Tarefas, substitua Adicionar um nome para o seu trabalho... pelo nome do seu trabalho. Por exemplo, "Fluxo de trabalho de músicas".
  3. Em Nome da tarefa, insira um nome para a primeira tarefa, por exemplo, Ingest_songs_data.
  4. Em Tipo, selecione o tipo de tarefa Bloco de Anotações .
  5. Em Origem, selecione Espaço de trabalho.
  6. Use o navegador de arquivos para localizar o bloco de anotações de ingestão de dados, clique no nome do bloco de anotações e clique em Confirmar.
  7. Em Cluster, selecione Shared_job_cluster ou o cluster que você criou na Create a cluster etapa.
  8. Clique em Criar.
  9. Clique Botão Adicionar Tarefa abaixo da tarefa que acabou de criar e selecione Bloco de Notas.
  10. Em Nome da tarefa, insira um nome para a tarefa, por exemplo, Prepare_songs_data.
  11. Em Tipo, selecione o tipo de tarefa Bloco de Anotações .
  12. Em Origem, selecione Espaço de trabalho.
  13. Use o navegador de arquivos para localizar o bloco de anotações de preparação de dados, clique no nome do bloco de anotações e clique em Confirmar.
  14. Em Cluster, selecione Shared_job_cluster ou o cluster que você criou na Create a cluster etapa.
  15. Clique em Criar.
  16. Clique Botão Adicionar Tarefa abaixo da tarefa que acabou de criar e selecione Bloco de Notas.
  17. Em Nome da tarefa, insira um nome para a tarefa, por exemplo, Analyze_songs_data.
  18. Em Tipo, selecione o tipo de tarefa Bloco de Anotações .
  19. Em Origem, selecione Espaço de trabalho.
  20. Use o navegador de arquivos para localizar o bloco de anotações de análise de dados, clique no nome do bloco de anotações e clique em Confirmar.
  21. Em Cluster, selecione Shared_job_cluster ou o cluster que você criou na Create a cluster etapa.
  22. Clique em Criar.
  23. Para executar o fluxo de trabalho, clique em Botão Executar agora. Para exibir os detalhes da execução, clique no link na coluna Hora de início da execução na exibição de execução do trabalho. Clique em cada tarefa para ver os detalhes da tarefa executada.
  24. Para exibir os resultados quando o fluxo de trabalho for concluído, clique na tarefa de análise de dados final. A página Saída é exibida e exibe os resultados da consulta.

Etapa 7: Agendar o trabalho do pipeline de dados

Nota

Para demonstrar o uso de um trabalho do Azure Databricks para orquestrar um fluxo de trabalho agendado, este exemplo de introdução separa as etapas de ingestão, preparação e análise em blocos de anotações separados, e cada bloco de anotações é usado para criar uma tarefa no trabalho. Se todo o processamento estiver contido em um único bloco de anotações, você poderá agendar facilmente o bloco de anotações diretamente da interface do usuário do bloco de anotações do Azure Databricks. Consulte Criar e gerenciar trabalhos agendados de blocos de anotações.

Um requisito comum é executar um pipeline de dados de forma programada. Para definir um cronograma para o trabalho que executa o pipeline:

  1. Clique em Ícone Fluxos de TrabalhoFluxos de trabalho na barra lateral.
  2. Na coluna Nome, clique no nome do trabalho. O painel lateral exibe os detalhes do trabalho.
  3. Clique em Adicionar gatilho no painel Detalhes do trabalho e selecione Agendado no tipo de gatilho.
  4. Especifique o período, a hora de início e o fuso horário. Opcionalmente, marque a caixa de seleção Mostrar sintaxe do Cron para exibir e editar a programação no Quartz Cron Syntax.
  5. Clique em Guardar.

Mais informações