Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Aprenda como criar um novo pipeline usando Lakeflow Spark Declarative Pipelines (SDP) para orquestração de dados e Auto Loader. Este tutorial estende o pipeline de exemplo limpando os dados e criando uma consulta para encontrar os 100 utilizadores principais.
Neste tutorial, aprende a usar o Lakeflow Pipelines Editor para:
- Crie um novo pipeline com a estrutura de pastas padrão e comece com um conjunto de ficheiros de exemplo.
- Defina as restrições de qualidade dos dados usando expectativas.
- Use as funcionalidades do editor para estender o pipeline com uma nova transformação para realizar análises nos seus dados.
Requerimentos
Antes de começares este tutorial, deves:
- Esteja conectado a um espaço de trabalho do Azure Databricks.
- Tenha o Unity Catalog ativado para o seu espaço de trabalho.
- Tem o editor de pipelines Lakeflow ativado para o teu espaço de trabalho e tens de estar inscrito. Ver Ativar o Lakeflow Pipelines Editor e monitorização atualizada.
- Ter permissão para criar um recurso de computação ou acesso a um recurso de computação.
- Ter permissões para criar um novo esquema num catálogo. As permissões necessárias são
ALL PRIVILEGESouUSE CATALOGeCREATE SCHEMA.
Etapa 1: Criar um pipeline
Neste passo, cria-se um pipeline usando a estrutura de pastas padrão e exemplos de código. Os exemplos de código referenciam a users tabela na wanderbricks fonte de dados de exemplo.
No seu espaço de trabalho Azure Databricks, clique no
Novo, depois no
Pipeline ETL. Isto abre o editor de pipeline na página de criação de pipeline.
Clique no cabeçalho para dar um nome ao seu pipeline.
Logo abaixo do nome, escolhe o catálogo e o esquema predefinidos para as tuas tabelas de saída. Estes elementos são utilizados quando não especifica um catálogo e esquema nas definições do pipeline.
Na secção Passo seguinte para o seu pipeline, clique em qualquer
Comece com código de exemplo em SQL ou
Começa com código de exemplo em Python, com base na tua preferência linguística. Isto altera a linguagem padrão do teu código de exemplo, mas podes adicionar código na outra linguagem mais tarde. Isto cria uma estrutura de pastas padrão com código de exemplo para começar.
Pode ver o código de exemplo no gestor de ativos do pipeline à esquerda do espaço de trabalho. Existem dois ficheiros abaixo de
transformationsque geram cada um um conjunto de dados de pipeline. Abaixo doexplorationsestá um caderno com código para ajudar a visualizar a saída do teu pipeline. Ao clicar num ficheiro, podes ver e editar o código no editor.Os conjuntos de dados de saída ainda não foram criados, e o gráfico Pipeline no lado direito do ecrã está vazio.
Para executar o código do pipeline (o código na
transformationspasta), clique em Executar pipeline no canto superior direito do ecrã.Depois de concluída a execução, a parte inferior do espaço de trabalho mostra-lhe as duas novas tabelas que foram criadas,
sample_users_<pipeline-name>esample_aggregation_<pipeline-name>. Também pode observar que o grafo Pipeline, no lado direito do espaço de trabalho, agora mostra as duas tabelas, incluindo o fato de quesample_usersé a fonte parasample_aggregation.
Passo 2: Aplicar verificações de qualidade dos dados
Neste passo, adiciona uma verificação de qualidade dos dados à sample_users tabela.
Utiliza-se as expectativas do pipeline para restringir os dados. Neste caso, apaga quaisquer registos de utilizador que não tenham um endereço de email válido e produz a tabela limpa como users_cleaned.
No navegador de ativos do pipeline, clique
e selecione Transformação.
No diálogo Criar novo ficheiro de transformação , faça as seguintes seleções:
- Escolha Python ou SQL para a linguagem. Isto não tem de corresponder à sua escolha anterior.
- Dá um nome ao ficheiro. Neste caso, escolha
users_cleaned. - Para o caminho de destino, mantenha o padrão.
- Para o tipo de conjunto de dados, deixe-o como Nenhum selecionado ou escolha a vista Materializada. Se selecionares vista materializada, gera código de exemplo para ti.
No teu novo ficheiro de código, edita o código para corresponder ao seguinte (usa SQL ou Python, com base na tua seleção no ecrã anterior). Substitua
<pipeline-name>pelo nome completo da suasample_userstabela.SQL
-- Drop all rows that do not have an email address CREATE MATERIALIZED VIEW users_cleaned ( CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW ) AS SELECT * FROM sample_users_<pipeline-name>;Python
from pyspark import pipelines as dp # Drop all rows that do not have an email address @dp.table @dp.expect_or_drop("no null emails", "email IS NOT NULL") def users_cleaned(): return ( spark.read.table("sample_users_<pipeline_name>") )Clique em Executar pipeline para atualizar o pipeline. Agora deveria ter três mesas.
Passo 3: Analisar os principais utilizadores
De seguida, obtém os 100 melhores utilizadores pelo número de reservas que já criaram. Junta a wanderbricks.bookings tabela à users_cleaned vista materializada.
No navegador de ativos do pipeline, clique
e selecione Transformação.
No diálogo Criar novo ficheiro de transformação , faça as seguintes seleções:
- Escolha Python ou SQL para a linguagem. Isto não tem de corresponder às tuas escolhas anteriores.
- Dá um nome ao ficheiro. Neste caso, escolha
users_and_bookings. - Para o caminho de destino, mantenha o padrão.
- Para o tipo de conjunto de dados, deixe-o como Nenhum selecionado.
No teu novo ficheiro de código, edita o código para corresponder ao seguinte (usa SQL ou Python, com base na tua seleção no ecrã anterior).
SQL
-- Get the top 100 users by number of bookings CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS SELECT u.name AS name, COUNT(b.booking_id) AS booking_count FROM users_cleaned u JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id GROUP BY u.name ORDER BY booking_count DESC LIMIT 100;Python
from pyspark import pipelines as dp from pyspark.sql.functions import col, count, desc # Get the top 100 users by number of bookings @dp.table def users_and_bookings(): return ( spark.read.table("users_cleaned") .join(spark.read.table("samples.wanderbricks.bookings"), "user_id") .groupBy(col("name")) .agg(count("booking_id").alias("booking_count")) .orderBy(desc("booking_count")) .limit(100) )Clique em Executar pipeline para atualizar os conjuntos de dados. Quando a execução termina, pode ver no Gráfico de Pipeline que existem quatro tabelas, incluindo a nova
users_and_bookingstabela.
Próximos passos
Agora que aprendeu a usar algumas das funcionalidades do editor de pipelines Lakeflow e criou um pipeline, aqui ficam outras funcionalidades sobre as quais pode saber mais:
Ferramentas para trabalhar e depurar transformações ao criar pipelines.
- Execução seletiva
- Pré-visualizações de dados
- DAG interativo (gráfico dos conjuntos de dados no seu pipeline)
Integração integrada do Databricks Asset Bundles para colaboração eficiente, controlo de versões e integração CI/CD diretamente do editor: