Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Saiba como criar e implantar um pipeline ETL (extrair, transformar e carregar) para orquestração de dados usando Lakeflow Declarative Pipelines e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de registos duplicados, e escrever os dados em um sistema de destino, como um data warehouse ou um data lake.
Neste tutorial, você usará Lakeflow Declarative Pipelines e Auto Loader para:
- Ingerir dados brutos de origem em uma tabela de destino.
- Transforme os dados brutos de origem e grave os dados transformados em duas visões materializadas de destino.
- Consulte os dados transformados.
- Automatize o pipeline de ETL com uma tarefa Databricks.
Para obter mais informações sobre Lakeflow Declarative Pipelines e Auto Loader, consulte Lakeflow Declarative Pipelines e What is Auto Loader?
Requisitos
Para concluir este tutorial, você deve atender aos seguintes requisitos:
- Esteja conectado a um espaço de trabalho do Azure Databricks.
- Tenha o Unity Catalog ativado para seu espaço de trabalho.
- Tenha a computação sem servidor habilitada para sua conta. Os pipelines declarativos do Lakeflow sem servidor não estão disponíveis em todas as regiões de trabalho. Consulte Recursos com disponibilidade regional limitada para regiões disponíveis.
- Ter permissão para criar um recurso de computação ou acesso a um recurso de computação.
- Ter permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGES
ouUSE CATALOG
eCREATE SCHEMA
. - Ter permissões para criar um novo volume em um esquema existente. As permissões necessárias são
ALL PRIVILEGES
ouUSE SCHEMA
eCREATE VOLUME
.
Sobre o conjunto de dados
O conjunto de dados usado neste exemplo é um subconjunto do Million Song Dataset, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de exemplo incluídos em seu espaço de trabalho do Azure Databricks.
Etapa 1: Criar um pipeline
Primeiro, você criará um pipeline ETL no Lakeflow Declarative Pipelines. O Lakeflow Declarative Pipelines cria pipelines resolvendo dependências definidas em blocos de anotações ou arquivos (chamados de código-fonte) usando a sintaxe Lakeflow Declarative Pipelines. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários blocos de anotações ou arquivos específicos do idioma no pipeline. Para saber mais, consulte Lakeflow Declarative Pipelines
Importante
Deixe o campo Código-fonte em branco para criar e configurar um bloco de anotações para criação automática do código-fonte.
Este tutorial usa computação sem servidor e Unity Catalog. Para todas as opções de configuração que não são especificadas, use as configurações padrão. Se a computação sem servidor não estiver habilitada ou suportada em seu espaço de trabalho, você poderá concluir o tutorial conforme escrito usando as configurações de computação padrão. Se você usar as configurações de computação padrão, deverá selecionar manualmente Unity Catalog em Opções de armazenamento na seção Destino da interface do usuário Criar pipeline .
Para criar um novo pipeline ETL em Lakeflow Declarative Pipelines, siga estas etapas:
- No espaço de trabalho, clique no
Jobs & Pipelines na barra lateral.
- Em Novo, clique em Pipeline ETL.
- Em Nome do pipeline, digite um nome de pipeline exclusivo.
- Marque a caixa de seleção Sem servidor .
- Em Destino, para configurar um local do Catálogo Unity onde as tabelas são publicadas, selecione um Catálogo existente e escreva um novo nome em Esquema para criar um novo esquema em seu catálogo.
- Clique em Criar.
A interface de utilizador do pipeline aparece para o novo pipeline.
Etapa 2: Desenvolver um pipeline
Importante
Os blocos de notas só podem conter uma única linguagem de programação. Não misture código Python e SQL em cadernos de código fonte de pipeline.
Nesta etapa, você usará o Databricks Notebooks para desenvolver e validar o código-fonte para Lakeflow Declarative Pipelines interativamente.
O código usa o Auto Loader para ingestão incremental de dados. O Auto Loader deteta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem. Para saber mais, consulte O que é o Auto Loader?
Um notebook de código-fonte em branco é criado e configurado automaticamente para a pipeline. O bloco de anotações é criado em um novo diretório no diretório do usuário. O nome do novo diretório e arquivo correspondem ao nome do seu pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline
.
Ao desenvolver um pipeline, você pode escolher Python ou SQL. Estão incluídos exemplos para ambas as línguas. Com base na sua escolha de idioma, verifique se você seleciona o idioma padrão do bloco de anotações. Para saber mais sobre o suporte de notebook para desenvolvimento de código Lakeflow Declarative Pipelines, consulte Desenvolver e depurar pipelines ETL com um notebook em Lakeflow Declarative Pipelines.
Um link para aceder a este notebook encontra-se no campo Código-fonte no painel Detalhes do pipeline. Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.
Clique em Connect no canto superior direito para abrir o menu de configuração de computação.
Passe o cursor sobre o nome do pipeline criado na Etapa 1.
Clique em Conectar.
Ao lado do título do bloco de anotações na parte superior, selecione a linguagem padrão do bloco de anotações (Python ou SQL).
Copie e cole o código a seguir em uma célula do bloco de anotações.
Píton
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Clique em Iniciar para iniciar uma atualização para o pipeline conectado.
Etapa 3: Consultar os dados transformados
Nesta etapa, você consultará os dados processados no pipeline ETL para analisar os dados da música. Essas consultas usam os registros preparados criados na etapa anterior.
Primeiro, execute uma consulta que encontre os artistas que lançaram mais músicas a cada ano desde 1990.
Na barra lateral, clique em
Editor SQL.
Clique no ícone
da nova guia e selecione Criar nova consulta no menu.
Insira o seguinte:
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Substitua
<catalog>
e<schema>
pelo nome do catálogo e esquema em que a tabela está. Por exemplo,data_pipelines.songs_data.top_artists_by_year
.Clique em Executar seleção.
Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançável.
Clique no
, novo ícone de toque e selecione Criar nova consulta no menu.
Insira o seguinte código:
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Substitua
<catalog>
e<schema>
pelo nome do catálogo e esquema em que a tabela está. Por exemplo,data_pipelines.songs_data.songs_prepared
.Clique em Executar seleção.
Etapa 4: Criar um trabalho para executar o pipeline
Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão, processamento e análise de dados usando um trabalho Databricks.
- No espaço de trabalho, clique no
Jobs & Pipelines na barra lateral.
- Em Novo, clique em Trabalho.
- Na caixa de título da tarefa, substitua a data e a hora< do Novo Trabalho > pelo nome do trabalho. Por exemplo,
Songs workflow
. - Em Nome da tarefa, insira um nome para a primeira tarefa, por exemplo,
ETL_songs_data
. - Em Tipo, selecione Pipeline.
- Em Pipeline, selecione o pipeline criado na etapa 1.
- Clique em Criar.
- Para executar o fluxo de trabalho, clique em Executar Agora. Para exibir os detalhes da execução, clique na guia Execuções . Clique na tarefa para ver os detalhes da tarefa executada.
- Para exibir os resultados quando o fluxo de trabalho for concluído, clique em Ir para a última execução bem-sucedida ou a hora de início da execução do trabalho. A página Saída é exibida e exibe os resultados da consulta.
Consulte Monitorização e observabilidade de tarefas do Lakeflow para obter mais informações sobre execuções de tarefas.
Etapa 5: Agendar o trabalho de pipeline
Para executar o pipeline ETL em um cronograma, execute estas etapas:
- Navegue até a interface do usuário Jobs & Pipelines no mesmo espaço de trabalho do Azure Databricks que o trabalho.
- Opcionalmente, selecione os filtros Trabalhos e Propriedade de mim .
- Na coluna Nome , clique no nome do trabalho. O painel lateral exibe os detalhes do trabalho.
- Clique em Adicionar gatilho no painel Agendas & Gatilhos e selecione Agendado no tipo de gatilho.
- Especifique o período, a hora de início e o fuso horário.
- Clique em Salvar.
Mais informações
- Para saber mais sobre pipelines de processamento de dados com Lakeflow Declarative Pipelines, consulte Lakeflow Declarative Pipelines
- Para saber mais sobre os blocos de anotações Databricks, consulte Introdução aos blocos de anotações Databricks.
- Para saber mais sobre o Lakeflow Jobs, consulte O que são empregos?
- Para saber mais sobre o Delta Lake, consulte O que é Delta Lake no Azure Databricks?