Tutorial: Executar seu primeiro pipeline do Delta Live Tables
Este tutorial orienta você pelas etapas para configurar seu primeiro pipeline do Delta Live Tables, escrever código ETL básico e executar uma atualização de pipeline.
Todas as etapas deste tutorial foram projetadas para workspaces com o Catálogo do Unity habilitado. Você também pode configurar pipelines do Delta Live Tables para trabalhar com o metastore herdado do Hive. Consulte Usar pipelines do Delta Live Tables com o metastore herdado do Hive.
Observação
Este tutorial tem instruções para desenvolver e validar um novo código de pipeline usando notebooks do Databricks. Você também pode configurar pipelines usando código-fonte em arquivos Python ou SQL.
Você pode configurar um pipeline para executar seu código se já tiver o código-fonte escrito usando a sintaxe Delta Live Tables. Consulte Configurar um pipeline do Delta Live Tables.
Você pode usar a sintaxe SQL totalmente declarativa no Databricks SQL para registrar e definir agendas de atualização para exibições materializadas e tabelas de streaming como objetos gerenciados pelo Catálogo do Unity. Confira Usar exibições materializadas no Databricks SQL e Carregar dados usando tabelas de streaming no Databricks SQL.
O exemplo neste artigo usa um conjunto de dados disponível publicamente que contém registros de nomes de bebês do Estado de Nova York. Este exemplo demonstra o uso de um pipeline do Delta Live Tables para:
- Leia dados CSV brutos de um volume em uma tabela.
- Leia os registros da tabela de ingestão e use as expectativas do Delta Live Tables para criar uma nova tabela que contenha dados limpos.
- Use os registros limpos como entrada para as consultas do Delta Live Tables que criam conjuntos de dados derivados.
Esse código demonstra um exemplo simplificado da arquitetura medallion. Confira O que é a arquitetura medallion do Lakehouse?.
As implementações deste exemplo são fornecidas para o Python e o SQL. Siga as etapas para criar um novo pipeline e notebook e copie e cole o código fornecido.
Exemplos de notebooks com código completo também são fornecidos.
Para iniciar um pipeline, você deve ter permissão de criação de cluster ou acesso a uma política de cluster definindo um cluster do Delta Live Tables. O runtime do Delta Live Tables cria um cluster antes de executar o pipeline e falhará se você não tiver a permissão correta.
Todos os usuários podem disparar atualizações usando pipelines sem servidor por padrão. A tecnologia sem servidor deve estar habilitada no nível da conta e pode não estar disponível na região do workspace. Confira Habilitar a computação sem servidor.
Os exemplos neste tutorial usam o Catálogo do Unity. O Databricks recomenda a criação de um novo esquema para executar este tutorial, pois vários objetos de banco de dados são criados no esquema de destino.
- Para criar um novo esquema em um catálogo, você deve ter
ALL PRIVILEGES
privilégios orUSE CATALOG
eCREATE SCHEMA
. - Se você não puder criar um novo esquema, execute este tutorial em um esquema existente. Você deve ter os seguintes privilégios:
USE CATALOG
para o catálogo pai.ALL PRIVILEGES
ouUSE SCHEMA
,CREATE MATERIALIZED VIEW
, eCREATE TABLE
privilégios no esquema de destino.
- Este tutorial usa um volume para armazenar dados de exemplo. O Databricks recomenda a criação de um novo volume para este tutorial. Se você criar um novo esquema para este tutorial, poderá criar um novo volume nesse esquema.
- Para criar um novo volume em um esquema existente, você deve ter os seguintes privilégios:
USE CATALOG
para o catálogo pai.ALL PRIVILEGES
ouUSE SCHEMA
eCREATE VOLUME
privilégios no esquema de destino.
- Opcionalmente, você pode usar um volume existente. Você deve ter os seguintes privilégios:
USE CATALOG
para o catálogo pai.USE SCHEMA
para o esquema pai.ALL PRIVILEGES
ouREAD VOLUME
eWRITE VOLUME
no volume de destino.
- Para criar um novo volume em um esquema existente, você deve ter os seguintes privilégios:
Para definir essas permissões, entre em contato com o administrador do Databricks. Para obter mais informações sobre privilégios do Catálogo do Unity, consulte Privilégios do Catálogo do Unity e objetos protegíveis.
- Para criar um novo esquema em um catálogo, você deve ter
Este exemplo carrega dados de um volume do Catálogo do Unity. O código a seguir baixa um arquivo CSV e o armazena no volume especificado. Abra um novo notebook e execute o seguinte código para baixar esses dados para o volume especificado:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
Substitua <catalog-name>
, <schema-name>
e <volume-name>
pelos nomes do catálogo, esquema e volume de um volume do Catálogo do Unity. O código fornecido tentará criar o esquema e o volume especificados se esses objetos não existirem. Você deve ter os privilégios apropriados para criar e gravar em objetos no Catálogo do Unity. Confira os Requisitos
Observação
Certifique-se de que este notebook tenha sido executado com êxito antes de continuar com o tutorial. Não configure esse notebook como parte do pipeline.
O Delta Live Tables cria pipelines resolvendo dependências definidas em notebooks ou arquivos (chamados de código-fonte) usando a sintaxe do Delta Live Tables. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários blocos de anotações ou arquivos específicos do idioma no pipeline.
Importante
Não configure nenhum ativo no campo Código-fonte . Deixar esse campo preto cria e configura um notebook para criação de código-fonte.
As instruções neste tutorial usam computação sem servidor e o Catálogo do Unity. Use as configurações padrão para todas as opções de configuração não mencionadas nestas instruções.
Observação
Se a tecnologia sem servidor não estiver habilitada ou não tiver suporte em seu workspace, você poderá concluir o tutorial conforme escrito usando as configurações de computação padrão. Você deve selecionar manualmente o Catálogo do Unity em Opções de armazenamento na seção Destino da interface do usuário Criar pipeline.
Para configurar um novo pipeline, faça o seguinte:
- Clique em Delta Live Tables na barra lateral.
- Clique em Criar Pipeline.
- Forneça um nome de pipeline exclusivo.
- Marque a caixa ao lado de Sem servidor.
- Selecione um Catálogo para publicar dados.
- Selecione um Esquema no catálogo.
- Especifique um novo nome de esquema para criar um esquema.
- Defina três parâmetros de pipeline usando o botão Adicionar configuração em Avançado para adicionar três configurações. Especifique o catálogo, o esquema e o volume para os quais você baixou os dados usando os seguintes nomes de parâmetro:
my_catalog
my_schema
my_volume
- Clique em Criar.
A interface do usuário de pipelines é exibida para o pipeline recém-criado. Um notebook de código-fonte é criado e configurado automaticamente para o pipeline.
O bloco de anotações é criado em um novo diretório no diretório do usuário. O nome do novo diretório e arquivo corresponde ao nome do pipeline. Por exemplo, /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Um link para acessar esse notebook está no campo Código-fonte no painel Detalhes do pipeline . Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.
Você pode usar notebooks Datbricks para desenvolver e validar interativamente o código-fonte para pipelines do Delta Live Tables. Você deve anexar seu notebook ao pipeline para usar essa funcionalidade. Para anexar seu notebook recém-criado ao pipeline que você acabou de criar:
- Clique em Conectar no canto superior direito para abrir o menu de configuração de computação.
- Passe o mouse sobre o nome do pipeline que você criou na Etapa 1.
- Clique em Conectar.
A interface do usuário é alterada para incluir os botões Validar e Iniciar no canto superior direito. Para saber mais sobre o suporte ao notebook para desenvolvimento de código de pipeline, consulte Desenvolver e depurar pipelines do Delta Live Tables em notebooks.
Importante
- Os pipelines do Delta Live Tables avaliam todas as células em um notebook durante o planejamento. Ao contrário dos notebooks que são executados em relação à computação para todas as finalidades ou agendados como trabalhos, os pipelines não garantem que as células sejam executadas na ordem especificada.
- Os notebooks só podem conter uma única linguagem de programação. Não misture código Python e SQL em notebooks de código-fonte de pipeline.
Para obter detalhes sobre como desenvolver código com Python ou SQL, consulte Desenvolver código de pipeline com Python ou Desenvolver código de pipeline com SQL.
Para implementar o exemplo neste tutorial, copie e cole o código a seguir em uma célula no notebook configurada como código-fonte para o pipeline.
O código fornecido faz o seguinte:
- Importa os módulos necessários (somente Python).
- Faz referência a parâmetros definidos durante a configuração do pipeline.
- Define uma tabela de streaming chamada
baby_names_raw
que ingere de um volume. - Define uma exibição materializada chamada
baby_names_prepared
que valida os dados ingeridos. - Define uma exibição materializada chamada
top_baby_names_2021
que tem uma exibição altamente refinada dos dados.
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Para iniciar uma atualização de pipeline, clique no botão Iniciar no canto superior direito da interface do usuário do notebook.
Os notebooks a seguir contêm os mesmos exemplos de código fornecidos neste artigo. Esses notebooks têm os mesmos requisitos que as etapas deste artigo. Confira os Requisitos
Para importar um bloco de anotações, conclua as seguintes etapas:
- Abra a interface do usuário do notebook.
- Clique em + Novo>Bloco de Anotações.
- Um bloco de anotações vazio é aberto.
- Clique em Arquivo>Importar. A caixa de diálogo Importar aparece.
- Selecione a opção URL para Importar de.
- Cole a URL do bloco de anotações.
- Clique em Importar.
Este tutorial requer que você execute um notebook de configuração de dados antes de configurar e executar o pipeline do Delta Live Tables. Importe o notebook a seguir, anexe o notebook a um recurso de computação, preencha a variável necessária para my_catalog
, my_schema
e e my_volume
clique em Executar tudo.
Os notebooks a seguir fornecem exemplos em Python ou SQL. Quando você importa um bloco de anotações, ele é salvo no diretório inicial do usuário.
Depois de importar um dos notebooks abaixo, conclua as etapas para criar um pipeline, mas use o seletor de arquivo de código-fonte para selecionar o notebook baixado. Depois de criar o pipeline com um notebook configurado como código-fonte, clique em Iniciar na interface do usuário do pipeline para disparar uma atualização.