Partilhar via


Tutorial: Crie o seu primeiro pipeline usando o Lakeflow Pipelines Editor

Aprenda como criar um novo pipeline usando Lakeflow Spark Declarative Pipelines (SDP) para orquestração de dados e Auto Loader. Este tutorial estende o pipeline de exemplo limpando os dados e criando uma consulta para encontrar os 100 utilizadores principais.

Neste tutorial, aprende a usar o Lakeflow Pipelines Editor para:

  • Crie um novo pipeline com a estrutura de pastas padrão e comece com um conjunto de ficheiros de exemplo.
  • Defina as restrições de qualidade dos dados usando expectativas.
  • Use as funcionalidades do editor para estender o pipeline com uma nova transformação para realizar análises nos seus dados.

Requerimentos

Antes de começares este tutorial, deves:

  • Esteja conectado a um espaço de trabalho do Azure Databricks.
  • Tenha o Unity Catalog ativado para o seu espaço de trabalho.
  • Tem o editor de pipelines Lakeflow ativado para o teu espaço de trabalho e tens de estar inscrito. Ver Ativar o Lakeflow Pipelines Editor e monitorização atualizada.
  • Ter permissão para criar um recurso de computação ou acesso a um recurso de computação.
  • Ter permissões para criar um novo esquema num catálogo. As permissões necessárias são ALL PRIVILEGES ou USE CATALOG e CREATE SCHEMA.

Etapa 1: Criar um pipeline

Neste passo, cria-se um pipeline usando a estrutura de pastas padrão e exemplos de código. Os exemplos de código referenciam a users tabela na wanderbricks fonte de dados de exemplo.

  1. No seu espaço de trabalho Azure Databricks, clique no ícone Plus.Novo, depois no ícone Pipeline.Pipeline ETL. Isto abre o editor de pipeline na página de criação de pipeline.

  2. Clique no cabeçalho para dar um nome ao seu pipeline.

  3. Logo abaixo do nome, escolhe o catálogo e o esquema predefinidos para as tuas tabelas de saída. Estes elementos são utilizados quando não especifica um catálogo e esquema nas definições do pipeline.

  4. Na secção Passo seguinte para o seu pipeline, clique em qualquer um dos ícones de Esquema.Comece com código de exemplo em SQL ou Schema Icon.Começa com código de exemplo em Python, com base na tua preferência linguística. Isto altera a linguagem padrão do teu código de exemplo, mas podes adicionar código na outra linguagem mais tarde. Isto cria uma estrutura de pastas padrão com código de exemplo para começar.

  5. Pode ver o código de exemplo no gestor de ativos do pipeline à esquerda do espaço de trabalho. Existem dois ficheiros abaixo de transformations que geram cada um um conjunto de dados de pipeline. Abaixo do explorations está um caderno com código para ajudar a visualizar a saída do teu pipeline. Ao clicar num ficheiro, podes ver e editar o código no editor.

    Os conjuntos de dados de saída ainda não foram criados, e o gráfico Pipeline no lado direito do ecrã está vazio.

  6. Para executar o código do pipeline (o código na transformations pasta), clique em Executar pipeline no canto superior direito do ecrã.

    Depois de concluída a execução, a parte inferior do espaço de trabalho mostra-lhe as duas novas tabelas que foram criadas, sample_users_<pipeline-name> e sample_aggregation_<pipeline-name>. Também pode observar que o grafo Pipeline, no lado direito do espaço de trabalho, agora mostra as duas tabelas, incluindo o fato de que sample_users é a fonte para sample_aggregation.

Passo 2: Aplicar verificações de qualidade dos dados

Neste passo, adiciona uma verificação de qualidade dos dados à sample_users tabela. Utiliza-se as expectativas do pipeline para restringir os dados. Neste caso, apaga quaisquer registos de utilizador que não tenham um endereço de email válido e produz a tabela limpa como users_cleaned.

  1. No navegador de ativos do pipeline, clique no ícone Mais e selecione Transformação.

  2. No diálogo Criar novo ficheiro de transformação , faça as seguintes seleções:

    • Escolha Python ou SQL para a linguagem. Isto não tem de corresponder à sua escolha anterior.
    • Dá um nome ao ficheiro. Neste caso, escolha users_cleaned.
    • Para o caminho de destino, mantenha o padrão.
    • Para o tipo de conjunto de dados, deixe-o como Nenhum selecionado ou escolha a vista Materializada. Se selecionares vista materializada, gera código de exemplo para ti.
  3. No teu novo ficheiro de código, edita o código para corresponder ao seguinte (usa SQL ou Python, com base na tua seleção no ecrã anterior). Substitua <pipeline-name> pelo nome completo da sua sample_users tabela.

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. Clique em Executar pipeline para atualizar o pipeline. Agora deveria ter três mesas.

Passo 3: Analisar os principais utilizadores

De seguida, obtém os 100 melhores utilizadores pelo número de reservas que já criaram. Junta a wanderbricks.bookings tabela à users_cleaned vista materializada.

  1. No navegador de ativos do pipeline, clique no ícone Mais e selecione Transformação.

  2. No diálogo Criar novo ficheiro de transformação , faça as seguintes seleções:

    • Escolha Python ou SQL para a linguagem. Isto não tem de corresponder às tuas escolhas anteriores.
    • Dá um nome ao ficheiro. Neste caso, escolha users_and_bookings.
    • Para o caminho de destino, mantenha o padrão.
    • Para o tipo de conjunto de dados, deixe-o como Nenhum selecionado.
  3. No teu novo ficheiro de código, edita o código para corresponder ao seguinte (usa SQL ou Python, com base na tua seleção no ecrã anterior).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Clique em Executar pipeline para atualizar os conjuntos de dados. Quando a execução termina, pode ver no Gráfico de Pipeline que existem quatro tabelas, incluindo a nova users_and_bookings tabela.

    Gráfico do pipeline que mostra quatro tabelas no pipeline

Próximos passos

Agora que aprendeu a usar algumas das funcionalidades do editor de pipelines Lakeflow e criou um pipeline, aqui ficam outras funcionalidades sobre as quais pode saber mais: