Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Saiba como criar e implantar um pipeline de ETL (extrair, transformar e carregar) para orquestração de dados usando DLT e Carregador Automático. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.
Neste tutorial, você usará o DLT e o Carregador Automático para:
- Ingira os dados brutos de origem em uma tabela de destino.
- Transforme os dados brutos de origem e escreva os dados transformados em duas visões materializadas de destino.
- Consulte os dados transformados.
- Automatize o pipeline de ETL com um trabalho do Databricks.
Para obter mais informações sobre DLT e Carregador Automático, consulte DLT e O que é o Carregador Automático?
Para concluir este tutorial, você deve atender aos seguintes requisitos:
- Seja conectado a um workspace do Azure Databricks.
- Habilite o Catálogo do Unity para seu workspace.
- Tenha a computação sem servidor habilitada para sua conta. Os pipelines do DLT sem servidor não estão disponíveis em todas as regiões do workspace. Consulte Recursos com disponibilidade regional limitada para obter uma lista de regiões disponíveis.
- Tenha permissão para criar um recurso de computação ou acesso a um recurso de computação.
- Tenha permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGES
ouUSE CATALOG
.CREATE SCHEMA
- Tenha permissões para criar um novo volume em um esquema existente. As permissões necessárias são
ALL PRIVILEGES
ouUSE SCHEMA
.CREATE VOLUME
O conjunto de dados usado neste exemplo é um subconjunto do Conjunto de dados de milhões de músicas, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de amostra incluídos no workspace do Azure Databricks.
Primeiro, você criará um pipeline de ETL no DLT. O DLT cria pipelines resolvendo dependências definidas em notebooks ou arquivos (chamados código-fonte) usando a sintaxe DLT. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários notebooks ou arquivos específicos do idioma no pipeline. Para saber mais, consulte DLT
Importante
Deixe o campo Código-fonte em branco para criar e configurar automaticamente um notebook para criação de código-fonte.
Este tutorial usa a computação sem servidor e o Catálogo do Unity. Para todas as opções de configuração não especificadas, use as configurações padrão. Se a computação sem servidor não estiver habilitada ou tiver suporte em seu workspace, você poderá concluir o tutorial como escrito usando as configurações de computação padrão. Você deve selecionar manualmente o Catálogo do Unity em Opções de Armazenamento na seção Destino da interface do usuário Criar pipeline.
Para criar um pipeline de ETL na plataforma DLT, siga estas etapas:
- Na barra lateral, clique em Pipelines.
- Clique em Criar pipeline e Pipeline de ETL.
- No nome do pipeline, digite um nome de pipeline exclusivo.
- Marque a caixa de seleção sem servidor .
- No Destino, para configurar um local do Catálogo do Unity onde as tabelas são publicadas, selecione um Catálogo existente e escreva um novo nome no Esquema para criar um novo esquema em seu catálogo.
- Clique em Criar.
- Clique no link do notebook de código-fonte no campo Código-fonte no painel Detalhes do pipeline.
A interface do usuário do pipeline é exibida para o novo pipeline.
Importante
Os notebooks só podem conter uma única linguagem de programação. Não misture código Python e SQL em notebooks de código-fonte do pipeline.
Nesta etapa, você usará notebooks do Databricks para desenvolver e validar interativamente o código-fonte para pipelines DLT.
O código usa o Carregador Automático para ingestão de dados incremental. O Carregador Automático detecta e processa automaticamente arquivos novos confirme eles chegam no armazenamento de objeto da nuvem. Para saber mais, confira o que é o Carregador Automático?
Um notebook de código-fonte em branco é criado e configurado automaticamente para o pipeline. O bloco de anotações é criado em um novo diretório no diretório do usuário. Os nomes do novo diretório e do arquivo correspondem ao nome do pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline
.
Ao desenvolver um pipeline DLT, você pode escolher Python ou SQL. Exemplos são incluídos para ambos os idiomas. Com base na sua escolha de idioma, selecione o idioma padrão do bloco de anotações. Para saber mais sobre o suporte ao notebook para desenvolvimento de código de pipeline DLT, consulte Desenvolver e depurar pipelines ETL com um notebook em DLT.
Um link para acessar este notebook está no campo Código-fonte no painel Pipeline details. Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.
Clique em Conectar no canto superior direito para abrir o menu de configuração de computação.
Passe o mouse sobre o nome do pipeline que você criou na Etapa 1.
Clique em Conectar.
Ao lado do título do bloco de anotações na parte superior, selecione o idioma padrão do bloco de anotações (Python ou SQL).
Copie e cole o código a seguir em uma célula no notebook.
Python
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists each year who released most songs." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year who released most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Nesta etapa, você consultará os dados processados no pipeline de ETL para analisar os dados da música. Essas consultas usam os registros preparados criados na etapa anterior.
Primeiro, execute uma consulta que encontra os artistas que mais lançaram músicas a cada ano, começando em 1990.
Na barra lateral, clique no
Editor de SQL.
Clique no
ícone de nova guia e selecione Criar consulta no menu.
Insira o seguinte:
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Substitua
<catalog>
e<schema>
pelo nome do catálogo e esquema em que a tabela está. Por exemplo,data_pipelines.songs_data.top_artists_by_year
.Clique em Executar seleção.
Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançante.
Clique no
novo ícone de toque e selecione Criar consulta no menu.
Insira o seguinte código:
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Substitua
<catalog>
e<schema>
pelo nome do catálogo e esquema em que a tabela está. Por exemplo,data_pipelines.songs_data.songs_prepared
.Clique em Executar selecionado.
Em seguida, crie um fluxo de trabalho para automatizar a execução das etapas de ingestão, processamento e análise de dados usando um trabalho do Databricks.
- No workspace, clique em
Fluxos de Trabalho na barra lateral e clique em Criar tarefa.
- Na caixa de título da tarefa, substitua a data e a hora> do Novo Trabalho < pelo nome do trabalho. Por exemplo,
Songs workflow
. - Em Nome da tarefa, insira um nome para a primeira tarefa, por exemplo,
ETL_songs_data
. - Em Tipo, selecione Pipeline.
- No Pipeline, selecione o pipeline DLT que você criou na etapa 1.
- Clique em Criar.
- Para executar o fluxo de trabalho, clique em Executar Agora. Para exibir os detalhes da execução, clique na guia Execuções . Clique na tarefa para exibir os detalhes da execução da tarefa.
- Para exibir os resultados quando o fluxo de trabalho for concluído, clique em Ir para a última execução bem-sucedida ou na hora de início da execução do trabalho. A página Saída aparece e exibe os resultados da consulta.
Para obter mais informações sobre execuções de trabalho, veja Monitoramento e observabilidade de trabalhos do Databricks.
Para executar o pipeline de ETL em um agendamento, siga estas etapas:
- Clique em
Fluxos de trabalho na barra lateral.
- Na coluna Nome, clique no nome do trabalho. O painel lateral exibe os Detalhes do trabalho.
- Clique em Adicionar gatilho no painel Agendas & Gatilhos e selecione Agendado no tipo gatilho.
- Especifique o período, a hora de início e o fuso horário.
- Clique em Save (Salvar).
- Para saber mais sobre pipelines de processamento de dados com DLT, consulte DLT
- Para saber mais sobre notebooks do Databricks, confira Introdução aos notebooks do Databricks.
- Para saber mais sobre os Trabalhos do Azure Databricks, confira O que são trabalhos?
- Para saber mais sobre Delta Lake, veja o que é Delta Lake?