Partilhar via


Tutorial: Execute seu primeiro pipeline Delta Live Tables

Importante

Os pipelines DLT sem servidor estão em Visualização Pública.

Este tutorial mostra como configurar um pipeline Delta Live Tables a partir do código em um bloco de anotações Databricks e executar o pipeline acionando uma atualização de pipeline. Este tutorial inclui um pipeline de exemplo para ingerir e processar um conjunto de dados de exemplo com código de exemplo usando as interfaces Python e SQL . Você também pode usar as instruções neste tutorial para criar um pipeline com qualquer bloco de anotações com sintaxe Delta Live Tables definida corretamente.

Você pode configurar pipelines Delta Live Tables e disparar atualizações usando a interface do usuário do espaço de trabalho do Azure Databricks ou opções de ferramentas automatizadas, como API, CLI, Databricks Asset Bundles ou como uma tarefa em um fluxo de trabalho Databricks. Para se familiarizar com a funcionalidade e os recursos do Delta Live Tables, o Databricks recomenda primeiro usar a interface do usuário para criar e executar pipelines. Além disso, quando você configura um pipeline na interface do usuário, o Delta Live Tables gera uma configuração JSON para seu pipeline que pode ser usada para implementar seus fluxos de trabalho programáticos.

Para demonstrar a funcionalidade Delta Live Tables, os exemplos neste tutorial baixam um conjunto de dados disponível publicamente. No entanto, o Databricks tem várias maneiras de se conectar a fontes de dados e ingerir dados que os pipelines que implementam casos de uso do mundo real usarão. Consulte Ingerir dados com Delta Live Tables.

Requisitos

  • Para iniciar um pipeline, você deve ter permissão de criação de cluster ou acesso a uma política de cluster que defina um cluster Delta Live Tables. O runtime de Tabelas Dinâmicas Delta cria um cluster antes de executar o pipeline e falha se não tiver a permissão correta.

  • Para usar os exemplos neste tutorial, seu espaço de trabalho deve ter o Unity Catalog habilitado.

  • Você deve ter as seguintes permissões no Catálogo Unity:

    • READ VOLUME e WRITE VOLUME, ou ALL PRIVILEGES, para o my-volume volume.
    • USE SCHEMA ou ALL PRIVILEGES para o default esquema.
    • USE CATALOG ou ALL PRIVILEGES para o main catálogo.

    Para definir essas permissões, consulte os privilégios e objetos protegíveis do administrador do Databricks ou do Catálogo Unity.

  • Os exemplos neste tutorial usam um volume do Catálogo Unity para armazenar dados de exemplo. Para usar esses exemplos, crie um volume e use o catálogo, o esquema e os nomes de volume desse volume para definir o caminho do volume usado pelos exemplos.

Nota

Se o seu espaço de trabalho não tiver o Unity Catalog habilitado, os blocos de anotações com exemplos que não exigem o Unity Catalog serão anexados a este artigo. Para usar esses exemplos, selecione Hive metastore como a opção de armazenamento ao criar o pipeline.

Onde você executa consultas Delta Live Tables?

As consultas Delta Live Tables são implementadas principalmente em notebooks Databricks, mas o Delta Live Tables não foi projetado para ser executado interativamente em células de notebook. A execução de uma célula que contém a sintaxe Delta Live Tables em um bloco de anotações Databricks resulta em uma mensagem de erro. Para executar suas consultas, você deve configurar seus blocos de anotações como parte de um pipeline.

Importante

  • Você não pode confiar na ordem de execução célula a célula de blocos de anotações ao escrever consultas para Delta Live Tables. Delta Live Tables avalia e executa todo o código definido em blocos de anotações, mas tem um modelo de execução diferente de um bloco de anotações Executar todos os comandos.
  • Não é possível misturar idiomas em um único arquivo de código-fonte do Delta Live Tables. Por exemplo, um bloco de anotações pode conter apenas consultas Python ou consultas SQL. Se você precisar usar vários idiomas em um pipeline, use vários blocos de anotações ou arquivos específicos de idioma no pipeline.

Você também pode usar o código Python armazenado em arquivos. Por exemplo, você pode criar um módulo Python que pode ser importado para seus pipelines Python ou definir funções definidas pelo usuário Python (UDFs) para usar em consultas SQL. Para saber mais sobre como importar módulos Python, consulte Importar módulos Python de pastas Git ou arquivos de espaço de trabalho. Para saber mais sobre como usar UDFs do Python, consulte Funções escalares definidas pelo usuário - Python.

Exemplo: Ingerir e processar dados de nomes de bebés de Nova Iorque

O exemplo neste artigo usa um conjunto de dados disponível publicamente que contém registros de nomes de bebês do Estado de Nova York. Estes exemplos demonstram o uso de um pipeline Delta Live Tables para:

  • Leia dados CSV brutos de um conjunto de dados disponível publicamente em uma tabela.
  • Leia os registros da tabela de dados brutos e use as expectativas do Delta Live Tables para criar uma nova tabela que contenha dados limpos.
  • Use os registros limpos como entrada para consultas Delta Live Tables que criam conjuntos de dados derivados.

Este código demonstra um exemplo simplificado da arquitetura medalhão. Veja O que é a arquitetura do medalhão lakehouse?.

Implementações deste exemplo são fornecidas para as interfaces Python e SQL . Você pode seguir as etapas para criar novos blocos de anotações que contenham o código de exemplo ou pular para Criar um pipeline e usar um dos blocos de anotações fornecidos nesta página.

Implementar um pipeline Delta Live Tables com Python

O código Python que cria conjuntos de dados Delta Live Tables deve retornar DataFrames, familiares aos usuários com experiência PySpark ou Pandas for Spark. Para usuários não familiarizados com DataFrames, o Databricks recomenda o uso da interface SQL. Consulte Implementar um pipeline Delta Live Tables com SQL.

Todas as APIs Python do Delta Live Tables são implementadas no dlt módulo. Seu código de pipeline Delta Live Tables implementado com Python deve importar explicitamente o dlt módulo na parte superior dos blocos de anotações e arquivos Python. Delta Live Tables difere de muitos scripts Python de uma maneira fundamental: você não chama as funções que executam a ingestão e transformação de dados para criar conjuntos de dados Delta Live Tables. Em vez disso, o Delta Live Tables interpreta as funções do decorador do dlt módulo em todos os arquivos carregados em um pipeline e cria um gráfico de fluxo de dados.

Para implementar o exemplo neste tutorial, copie e cole o seguinte código Python em um novo bloco de anotações Python. Você deve adicionar cada trecho de código de exemplo à sua própria célula no bloco de anotações na ordem descrita. Para rever as opções de criação de blocos de notas, consulte Criar um bloco de notas.

Quando você cria um pipeline com a interface Python, por padrão, os nomes de tabela são definidos por nomes de função. Por exemplo, o exemplo Python a seguir cria três tabelas chamadas baby_names_raw, baby_names_preparede top_baby_names_2021. Você pode substituir o nome da tabela usando o name parâmetro. Consulte Criar uma visualização materializada do Delta Live Tables ou tabela de streaming.

Importante

Para evitar um comportamento inesperado quando o pipeline é executado, não inclua código que possa ter efeitos colaterais em suas funções que definem conjuntos de dados. Para saber mais, consulte a referência do Python.

Importar o módulo Delta Live Tables

Todas as APIs Python do Delta Live Tables são implementadas no dlt módulo. Importe explicitamente o dlt módulo na parte superior dos blocos de anotações e arquivos Python.

O exemplo a seguir mostra essa importação, juntamente com as instruções de importação para pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Transferir os dados

Para obter os dados deste exemplo, baixe um arquivo CSV e armazene-o no volume da seguinte maneira:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Catálogo Unity.

Criar uma tabela a partir de arquivos no armazenamento de objetos

O Delta Live Tables dá suporte ao carregamento de dados de todos os formatos suportados pelo Azure Databricks. Consulte Opções de formato de dados.

O @dlt.table decorador diz à Delta Live Tables para criar uma tabela que contenha o resultado de um DataFrame retorno de uma função. Adicione o decorador @dlt.table antes de qualquer definição de função Python que retorne um Spark DataFrame para registrar uma nova tabela no Delta Live Tables. O exemplo a seguir demonstra o uso do nome da função como o nome da tabela e a adição de um comentário descritivo à tabela:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Adicionar uma tabela de um conjunto de dados upstream no pipeline

Você pode usar dlt.read() para ler dados de outros conjuntos de dados declarados em seu pipeline Delta Live Tables atual. Declarar novas tabelas dessa maneira cria uma dependência que o Delta Live Tables resolve automaticamente antes de executar atualizações. O código a seguir também inclui exemplos de monitoramento e aplicação da qualidade dos dados com expectativas. Consulte Gerenciar a qualidade dos dados com o Delta Live Tables.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Criar uma tabela com vistas de dados enriquecidas

Como o Delta Live Tables processa atualizações para pipelines como uma série de gráficos de dependência, você pode declarar exibições altamente enriquecidas que alimentam painéis, BI e análises declarando tabelas com lógica de negócios específica.

As tabelas em Delta Live Tables são equivalentes conceitualmente a visualizações materializadas. Enquanto as exibições tradicionais na lógica de execução do Spark cada vez que a exibição é consultada, uma tabela Delta Live Tables armazena a versão mais recente dos resultados da consulta em arquivos de dados. Como o Delta Live Tables gerencia atualizações para todos os conjuntos de dados em um pipeline, você pode agendar atualizações de pipeline para corresponder aos requisitos de latência para exibições materializadas e saber que as consultas nessas tabelas contêm a versão mais recente dos dados disponíveis.

A tabela definida pelo código a seguir demonstra a semelhança conceitual com uma exibição materializada derivada de dados upstream em seu pipeline:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Para configurar um pipeline que usa o bloco de anotações, consulte Criar um pipeline.

Implementar um pipeline Delta Live Tables com SQL

O Databricks recomenda o Delta Live Tables com SQL como a maneira preferida para os usuários do SQL criarem novos pipelines de ETL, ingestão e transformação no Azure Databricks. A interface SQL para Delta Live Tables estende o Spark SQL padrão com muitas novas palavras-chave, construções e funções com valor de tabela. Essas adições ao SQL padrão permitem que os usuários declarem dependências entre conjuntos de dados e implantem infraestrutura de nível de produção sem aprender novas ferramentas ou conceitos adicionais.

Para usuários familiarizados com o Spark DataFrames e que precisam de suporte para testes e operações mais abrangentes que são difíceis de implementar com SQL, como operações de metaprogramação, o Databricks recomenda o uso da interface Python. Consulte Implementar um pipeline Delta Live Tables com Python.

Transferir os dados

Para obter os dados deste exemplo, copie o código a seguir, cole-o em um novo bloco de anotações e execute o bloco de anotações. Para rever as opções de criação de blocos de notas, consulte Criar um bloco de notas.

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Catálogo Unity.

Criar uma tabela a partir de arquivos no Unity Catalog

Para o restante deste exemplo, copie os seguintes trechos SQL e cole-os em um novo bloco de anotações SQL, separado do bloco de anotações na seção anterior. Você deve adicionar cada trecho SQL de exemplo à sua própria célula no bloco de anotações na ordem descrita.

O Delta Live Tables dá suporte ao carregamento de dados de todos os formatos suportados pelo Azure Databricks. Consulte Opções de formato de dados.

Todas as instruções SQL do Delta Live Tables usam CREATE OR REFRESH sintaxe e semântica. Quando você atualiza um pipeline, o Delta Live Tables determina se o resultado logicamente correto para a tabela pode ser obtido por meio de processamento incremental ou se a recomputação completa é necessária.

O exemplo a seguir cria uma tabela carregando dados do arquivo CSV armazenado no volume do Catálogo Unity:

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Catálogo Unity.

Adicionar uma tabela de um conjunto de dados upstream ao pipeline

Você pode usar o live esquema virtual para consultar dados de outros conjuntos de dados declarados em seu pipeline Delta Live Tables atual. Declarar novas tabelas dessa maneira cria uma dependência que o Delta Live Tables resolve automaticamente antes de executar atualizações. O live esquema é uma palavra-chave personalizada implementada no Delta Live Tables que pode ser substituída por um esquema de destino se você quiser publicar seus conjuntos de dados. Consulte Usar o catálogo Unity com seus pipelines do Delta Live Tables e Publicar dados do Delta Live Tables no metastore do Hive.

O código a seguir também inclui exemplos de monitoramento e aplicação da qualidade dos dados com expectativas. Consulte Gerenciar a qualidade dos dados com o Delta Live Tables.

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Criar uma vista de dados enriquecida

Como o Delta Live Tables processa atualizações para pipelines como uma série de gráficos de dependência, você pode declarar exibições altamente enriquecidas que alimentam painéis, BI e análises declarando tabelas com lógica de negócios específica.

As mesas ao vivo são equivalentes conceitualmente a visões materializadas. Enquanto as exibições tradicionais na lógica de execução do Spark sempre que a exibição é consultada, as tabelas dinâmicas armazenam a versão mais recente dos resultados da consulta em arquivos de dados. Como o Delta Live Tables gerencia atualizações para todos os conjuntos de dados em um pipeline, você pode agendar atualizações de pipeline para corresponder aos requisitos de latência para exibições materializadas e saber que as consultas nessas tabelas contêm a versão mais recente dos dados disponíveis.

O código a seguir cria uma exibição materializada enriquecida de dados upstream:

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Para configurar um pipeline que usa o bloco de anotações, continue para Criar um pipeline.

Criar um pipeline

Nota

Como os recursos de computação são totalmente gerenciados para pipelines DLT sem servidor (Visualização pública), as configurações de computação não estão disponíveis quando você seleciona Serverless para um pipeline.

Para saber mais sobre como habilitar pipelines DLT sem servidor, entre em contato com sua equipe de conta do Azure Databricks.

O Delta Live Tables cria pipelines resolvendo dependências definidas em blocos de anotações ou arquivos (chamados de código-fonte ou bibliotecas) usando a sintaxe Delta Live Tables. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode misturar bibliotecas de idiomas diferentes em seu pipeline.

  1. Clique em Delta Live Tables na barra lateral e clique em Create Pipeline.
  2. Dê um nome ao pipeline.
  3. (Opcional) Para executar seu pipeline usando pipelines DLT sem servidor, marque a caixa de seleção Serverless . Quando você seleciona Serverless, as configurações de computação são removidas da interface do usuário. Consulte Criar pipelines totalmente gerenciados usando Delta Live Tables com computação sem servidor.
  4. (Opcional) Selecione uma edição do produto.
  5. Selecione Acionado para o modo de pipeline.
  6. Configure um ou mais blocos de anotações contendo o código-fonte do pipeline. Na caixa de texto Demarcadores , insira o caminho para um bloco de anotações ou clique Ícone do seletor de arquivos para selecionar um bloco de anotações.
  7. Selecione um destino para os conjuntos de dados publicados pelo pipeline, o metastore do Hive ou o Unity Catalog. Consulte Publicar conjuntos de dados.
    • Metastore do Hive:
      • (Opcional) Insira um local de armazenamento para os dados de saída do pipeline. O sistema usa um local padrão se você deixar o local de armazenamento vazio.
      • (Opcional) Especifique um esquema de destino para publicar seu conjunto de dados no metastore do Hive.
    • Catálogo Unity: especifique um Catálogo e um esquema de Destino para publicar seu conjunto de dados no Catálogo Unity.
  8. (Opcional) Se você não tiver selecionado Serverless, poderá definir configurações de computação para o pipeline. Para saber mais sobre as opções para configurações de computação, consulte Definir configurações de pipeline para Delta Live Tables.
  9. (Opcional) Clique em Adicionar notificação para configurar um ou mais endereços de email para receber notificações de eventos de pipeline. Consulte Adicionar notificações por e-mail para eventos de pipeline.
  10. (Opcional) Configure configurações avançadas para o pipeline. Para saber mais sobre as opções para configurações avançadas, consulte Definir configurações de pipeline para Delta Live Tables.
  11. Clique em Criar.

O sistema exibe a página Detalhes do pipeline depois que você clica em Criar. Você também pode acessar seu pipeline clicando no nome do pipeline na guia Delta Live Tables .

Iniciar uma atualização de pipeline

Para iniciar uma atualização para um pipeline, clique Ícone Iniciar do Delta Live Tables no botão no painel superior. O sistema retorna uma mensagem confirmando que seu pipeline está iniciando.

Depois de iniciar com êxito a atualização, o sistema Delta Live Tables:

  1. Inicia um cluster usando uma configuração de cluster criada pelo sistema Delta Live Tables. Você também pode especificar uma configuração de cluster personalizada.
  2. Cria quaisquer tabelas que não existam e garante que o esquema esteja correto para quaisquer tabelas existentes.
  3. Atualiza as tabelas com os dados mais recentes disponíveis.
  4. Desliga o cluster quando a atualização é concluída.

Nota

O modo de execução é definido como Produção por padrão, que implanta recursos de computação efêmeros para cada atualização. Você pode usar o modo de desenvolvimento para alterar esse comportamento, permitindo que os mesmos recursos de computação sejam usados para várias atualizações de pipeline durante o desenvolvimento e o teste. Consulte Modos de desenvolvimento e produção.

Publicar conjuntos de dados

Você pode disponibilizar conjuntos de dados Delta Live Tables para consulta publicando tabelas no metastore do Hive ou no Unity Catalog. Se você não especificar um destino para publicar dados, as tabelas criadas nos pipelines Delta Live Tables só poderão ser acessadas por outras operações nesse mesmo pipeline. Consulte Publicar dados do Delta Live Tables no metastore do Hive e Usar o Unity Catalog com seus pipelines do Delta Live Tables.

Exemplo de blocos de anotações de código-fonte

Você pode importar esses blocos de anotações para um espaço de trabalho do Azure Databricks e usá-los para implantar um pipeline Delta Live Tables. Consulte Criar um pipeline.

Introdução ao bloco de anotações Python Delta Live Tables

Obter o bloco de notas

Introdução ao bloco de anotações SQL Delta Live Tables

Obter o bloco de notas

Exemplo de blocos de anotações de código-fonte para espaços de trabalho sem o Unity Catalog

Você pode importar esses blocos de anotações para um espaço de trabalho do Azure Databricks sem o Unity Catalog habilitado e usá-los para implantar um pipeline Delta Live Tables. Consulte Criar um pipeline.

Introdução ao bloco de anotações Python Delta Live Tables

Obter o bloco de notas

Introdução ao bloco de anotações SQL Delta Live Tables

Obter o bloco de notas