Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Saiba como criar e implantar um pipeline ETL (extrair, transformar e carregar) para orquestração de dados usando Pipelines Declarativos do Lakeflow e Carregador Automático. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.
Neste tutorial, você usará o Lakeflow Declarative Pipelines e o Carregador Automático para:
- Importar dados brutos de origem para uma tabela de destino.
- Transforme os dados brutos de origem e escreva os dados transformados em duas visões materializadas de destino.
- Consulte os dados transformados.
- Automatize o pipeline de ETL com um trabalho do Databricks.
Para obter mais informações sobre o Lakeflow Declarative Pipelines e o Carregador Automático, consulte Pipelines Declarativos do Lakeflow e o que é o Carregador Automático?
Requisitos
Para concluir este tutorial, você deve atender aos seguintes requisitos:
- Seja conectado a um workspace do Azure Databricks.
- Habilite o Catálogo do Unity para seu workspace.
- Tenha a computação sem servidor habilitada para sua conta. Pipelines Declarativos sem servidor do Lakeflow não estão disponíveis em todas as regiões da área de trabalho. Veja Recursos com disponibilidade regional limitada para saber as regiões disponíveis.
- Tenha permissão para criar um recurso de computação ou acesso a um recurso de computação.
- Tenha permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGES
ouUSE CATALOG
.CREATE SCHEMA
- Tenha permissões para criar um novo volume em um esquema existente. As permissões necessárias são
ALL PRIVILEGES
ouUSE SCHEMA
.CREATE VOLUME
Sobre o conjunto de dados
O conjunto de dados usado neste exemplo é um subconjunto do Conjunto de dados de milhões de músicas, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de amostra incluídos no workspace do Azure Databricks.
Etapa 1: Criar um pipeline
Primeiro, você criará um pipeline ETL no Lakeflow Declarative Pipelines. O Lakeflow Declarative Pipelines cria pipelines resolvendo dependências definidas em notebooks ou arquivos (chamados de código-fonte) usando a sintaxe do Lakeflow Declarative Pipelines. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários notebooks ou arquivos específicos do idioma no pipeline. Para saber mais, confira Pipelines Declarativos do Lakeflow
Importante
Deixe o campo Código-fonte em branco para criar e configurar um notebook para criação de código-fonte automaticamente.
Este tutorial usa a computação sem servidor e o Catálogo do Unity. Para todas as opções de configuração que não são especificadas, use as configurações padrão. Se a computação sem servidor não estiver habilitada ou tiver suporte em seu workspace, você poderá concluir o tutorial como escrito usando as configurações de computação padrão. Você deve selecionar manualmente o Catálogo do Unity em Opções de Armazenamento na seção Destino da interface do usuário Criar pipeline.
Para criar um novo pipeline de ETL no Lakeflow Declarative Pipelines, siga estas etapas:
- No seu espaço de trabalho, clique no
Tarefas e Pipelines na barra lateral.
- Em Novo, clique Pipeline ETL.
- No nome do pipeline, digite um nome de pipeline exclusivo.
- Marque a caixa de seleção sem servidor .
- No Destino, para configurar um local do Catálogo do Unity onde as tabelas são publicadas, selecione um Catálogo existente e escreva um novo nome no Esquema para criar um novo esquema em seu catálogo.
- Clique em Criar.
A interface do usuário do pipeline é exibida para o novo pipeline.
Etapa 2: Desenvolver um pipeline
Importante
Os notebooks só podem conter uma única linguagem de programação. Não misture código Python e SQL em notebooks de código-fonte do pipeline.
Nesta etapa, você usará o Databricks Notebooks para desenvolver e validar o código-fonte para o Lakeflow Declarative Pipelines interativamente.
O código usa o Carregador Automático para ingestão de dados incremental. O Carregador Automático detecta e processa automaticamente arquivos novos confirme eles chegam no armazenamento de objeto da nuvem. Para saber mais, confira o que é o Carregador Automático?
Um notebook de código-fonte em branco é criado e configurado automaticamente para o pipeline. O bloco de anotações é criado em um novo diretório no diretório do usuário. Os nomes do novo diretório e do arquivo correspondem ao nome do pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline
.
Ao desenvolver um pipeline, você pode escolher Python ou SQL. Exemplos são incluídos para ambos os idiomas. Com base na sua escolha de idioma, verifique se você seleciona o idioma padrão do bloco de anotações. Para saber mais sobre o suporte ao notebook para o desenvolvimento de código do Lakeflow Declarative Pipelines, consulte Desenvolver e depurar pipelines ETL com um notebook no Lakeflow Declarative Pipelines.
Um link para acessar este notebook está no campo Código-fonte no painel Pipeline details. Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.
Clique em Conectar no canto superior direito para abrir o menu de configuração de computação.
Passe o mouse sobre o nome do pipeline que você criou na Etapa 1.
Clique em Conectar.
Ao lado do título do bloco de anotações na parte superior, selecione o idioma padrão do bloco de anotações (Python ou SQL).
Copie e cole o código a seguir em uma célula no notebook.
Pitão
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Clique em Iniciar para iniciar uma atualização para o pipeline conectado.
Etapa 3: Consultar os dados transformados
Nesta etapa, você consultará os dados processados no pipeline de ETL para analisar os dados da música. Essas consultas usam os registros preparados criados na etapa anterior.
Primeiro, execute uma consulta que encontra os artistas que mais lançaram músicas a cada ano desde 1990.
Na barra lateral, clique no
Editor de SQL.
Clique no
ícone de nova guia e selecione Criar consulta no menu.
Insira o seguinte:
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Substitua
<catalog>
e<schema>
pelo nome do catálogo e esquema em que a tabela está. Por exemplo,data_pipelines.songs_data.top_artists_by_year
.Clique em Executar seleção.
Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançante.
Clique no
novo ícone de toque e selecione Criar consulta no menu.
Insira o seguinte código:
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Substitua
<catalog>
e<schema>
pelo nome do catálogo e esquema em que a tabela está. Por exemplo,data_pipelines.songs_data.songs_prepared
.Clique em Executar seleção.
Etapa 4: Criar uma tarefa para executar o pipeline
Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão, processamento e análise de dados usando um trabalho do Databricks.
- No seu espaço de trabalho, clique no
Tarefas e Pipelines na barra lateral.
- Em Novo, clique em Trabalho.
- Na caixa de título da tarefa, substitua a data e a hora< do Novo Trabalho > pelo nome do trabalho. Por exemplo,
Songs workflow
. - Em Nome da tarefa, insira um nome para a primeira tarefa, por exemplo,
ETL_songs_data
. - Em Tipo, selecione Pipeline.
- No Pipeline, selecione o pipeline que você criou na etapa 1.
- Clique em Criar.
- Para executar o fluxo de trabalho, clique em Executar Agora. Para exibir os detalhes da execução, clique na guia Execuções . Clique na tarefa para exibir os detalhes da execução da tarefa.
- Para exibir os resultados quando o fluxo de trabalho for concluído, clique em Ir para a última execução bem-sucedida ou na hora de início da execução do trabalho. A página Saída aparece e exibe os resultados da consulta.
Consulte Monitoramento e observabilidade para Tarefas do Lakeflow para obter mais informações sobre execuções de tarefas.
Etapa 5: Agendar o trabalho de pipeline
Para executar o pipeline de ETL em um agendamento, siga estas etapas:
- Navegue até a interface do usuário de Trabalhos &Pipelines no mesmo workspace do Azure Databricks que o trabalho.
- Opcionalmente, selecione os filtros Trabalhos e Propriedade minha .
- Na coluna Nome, clique no nome do trabalho. O painel lateral exibe os Detalhes do trabalho.
- Clique em Adicionar gatilho no painel Agendas & Gatilhos e selecione Agendado no tipo gatilho.
- Especifique o período, a hora de início e o fuso horário.
- Clique em Save (Salvar).
Saiba mais
- Para saber mais sobre pipelines de processamento de dados com Pipelines Declarativos do Lakeflow, consulte Pipelines Declarativos do Lakeflow
- Para saber mais sobre o Databricks Notebooks, consulte Introdução aos notebooks do Databricks.
- Para saber mais sobre Trabalhos do Lakeflow, confira O que são trabalhos?
- Para saber mais sobre o Delta Lake, confira o que é o Delta Lake no Azure Databricks?