Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Saiba como criar e desenvolver um fluxo ETL (extrair, transformar e carregar) com captura de dados de alteração (CDC) utilizando o Lakeflow Spark Declarative Pipelines (SDP) para orquestração de dados e o Auto Loader. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de registos duplicados, e escrever os dados em um sistema de destino, como um data warehouse ou um data lake.
Neste tutorial, você usará dados de uma customers tabela em um banco de dados MySQL para:
- Extraia as alterações de um banco de dados transacional usando o Debezium ou outra ferramenta e salve-as no armazenamento de objetos na nuvem (S3, ADLS ou GCS). Neste tutorial, você ignora a configuração de um sistema CDC externo e, em vez disso, gera dados falsos para simplificar o tutorial.
- Use o Auto Loader para carregar incrementalmente as mensagens do armazenamento de objetos na nuvem e armazenar as mensagens brutas na
customers_cdctabela. Auto Loader infere o esquema e lida com a evolução do esquema. - Crie a tabela para verificar a qualidade dos
customers_cdc_cleandados usando expectativas. Por exemplo, oidnunca deve sernullporque é usado para executar operações de upsert. - Execute
AUTO CDC ... INTOsobre os dados CDC limpos para inserir ou atualizar alterações na tabela finalcustomers. - Mostrar como um pipeline pode criar uma tabela de dimensão de mudança lenta do tipo 2 (SCD2) para acompanhar todas as alterações.
O objetivo é ingerir os dados brutos quase em tempo real e construir uma tabela para sua equipe de analistas, garantindo a qualidade dos dados.
O tutorial usa a arquitetura medalhão Lakehouse, em que ingere dados brutos através da camada de bronze, limpa e valida os dados com a camada de prata, e aplica modelagem dimensional e agregação aos dados usando a camada de ouro. Consulte O que é a arquitetura de medallion lakehouse? para obter mais informações.
O fluxo implementado tem esta aparência:
Para obter mais informações sobre pipeline, Auto Loader e CDC, consulte Lakeflow Spark Declarative Pipelines, O que é Auto Loader? e O que é captura de dados de alteração (CDC)?
Requerimentos
Para concluir este tutorial, você deve atender aos seguintes requisitos:
- Esteja conectado a um espaço de trabalho do Azure Databricks.
- Tenha o Unity Catalog ativado para seu espaço de trabalho.
- Tenha a computação sem servidor habilitada para sua conta. Serverless Lakeflow Spark Declarative Pipelines não está disponível em todas as regiões do espaço de trabalho. Consulte Recursos com disponibilidade regional limitada para regiões disponíveis. Se a computação sem servidor não estiver habilitada para sua conta, as etapas deverão funcionar com a computação padrão para seu espaço de trabalho.
- Ter permissão para criar um recurso de computação ou acesso a um recurso de computação.
- Ter permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGESouUSE CATALOGeCREATE SCHEMA. - Ter permissões para criar um novo volume em um esquema existente. As permissões necessárias são
ALL PRIVILEGESouUSE SCHEMAeCREATE VOLUME.
Alterar a captura de dados em um pipeline de ETL
Change data capture (CDC) é o processo que captura alterações em registros feitas em um banco de dados transacional (por exemplo, MySQL ou PostgreSQL) ou um data warehouse. O CDC captura operações como exclusões de dados, acréscimos e atualizações, normalmente como um fluxo para rematerializar tabelas em sistemas externos. O CDC permite o carregamento incremental enquanto elimina a necessidade de atualizações de carga em massa.
Observação
Para simplificar este tutorial, ignore a configuração de um sistema CDC externo. Suponha que ele esteja executando e salvando dados CDC como arquivos JSON no armazenamento de objetos na nuvem (S3, ADLS ou GCS). Este tutorial usa a Faker biblioteca para gerar os dados usados no tutorial.
Capturando CDC
Uma variedade de ferramentas CDC estão disponíveis. Uma das principais soluções de código aberto é a Debezium, mas existem outras implementações que simplificam as fontes de dados, como Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate e AWS DMS.
Neste tutorial, você usa dados CDC de um sistema externo como Debezium ou DMS. Debezium captura cada linha alterada. Normalmente, envia o histórico de alterações de dados para tópicos do Kafka ou salva-os como arquivos.
Você deve ingerir as informações do CDC da tabela customers (formato JSON), verificar se estão corretas e, em seguida, criar a tabela de clientes no Lakehouse.
Entrada CDC de Debezium
Para cada alteração, você recebe uma mensagem JSON contendo todos os campos da linha que está sendo atualizada (id, firstname, lastname, email, address). A mensagem também inclui metadados adicionais:
-
operation: Um código de operação, normalmente (DELETE,APPEND,UPDATE). -
operation_date: A data e hora do registo para cada ação operativa.
Ferramentas como Debezium podem produzir saídas mais avançadas, como o valor da linha antes da alteração, mas este tutorial as omite para simplificar.
Etapa 1: Criar um pipeline
Crie um novo pipeline ETL para consultar sua fonte de dados CDC e gerar tabelas em seu espaço de trabalho.
No espaço de trabalho, clique no
Novo no canto superior esquerdo.
Clique em Pipeline ETL.
Altere o título do pipeline para
Pipelines with CDC tutorialou um nome de sua preferência.Sob o título, escolha um catálogo e esquema para o qual tenhas permissões de escrita.
Esse catálogo e esquema são usados por padrão, se você não especificar um catálogo ou esquema em seu código. Seu código pode gravar em qualquer catálogo ou esquema especificando o caminho completo. Este tutorial usa os padrões especificados aqui.
Em Opções avançadas, selecione Iniciar com um arquivo vazio.
Escolha uma pasta para o seu código. Você pode selecionar Procurar para procurar a lista de pastas no espaço de trabalho. Você pode escolher qualquer pasta para a qual tenha permissões de gravação.
Para usar o controle de versão, selecione uma pasta Git. Se precisar criar uma nova pasta, selecione o
botão.
Escolha Python ou SQL para a linguagem do seu arquivo, com base na linguagem que você deseja usar para o tutorial.
Clique em Selecionar para criar o pipeline com essas configurações e abra o Lakeflow Pipelines Editor.
Agora você tem um pipeline em branco com um catálogo e esquema padrão. Em seguida, configure os dados de exemplo a serem importados no tutorial.
Etapa 2: Criar os dados de exemplo a serem importados neste tutorial
Esta etapa não é necessária se você estiver importando seus próprios dados de uma fonte existente. Para este tutorial, gere dados falsos como um exemplo para o tutorial. Crie um bloco de anotações para executar o script de geração de dados Python. Esse código só precisa ser executado uma vez para gerar os dados de exemplo, portanto, crie-o dentro da pasta do explorations pipeline, que não é executada como parte de uma atualização de pipeline.
Observação
Este código usa Faker para gerar os dados CDC de exemplo. Faker está disponível para instalar automaticamente, então o tutorial usa %pip install faker. Você também pode definir uma dependência no faker para o notebook. Consulte Adicionar dependências ao notebook.
No Lakeflow Pipelines Editor, na barra lateral do navegador de ativos à esquerda do editor, clique no
Adicionar e, em seguida, escolha Exploração.
Dê-lhe um Nome, como
Setup data, selecione Python. Você pode deixar a pasta de destino padrão, que é uma novaexplorationspasta.Clique em Criar. Isso cria um bloco de anotações na nova pasta.
Insira o seguinte código na primeira célula. Você deve alterar a definição de
<my_catalog>e<my_schema>para corresponder ao catálogo e esquema padrão que você selecionou no procedimento anterior:%pip install faker # Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = dbName = db = "<my_schema>" spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`') volume_folder = f"/Volumes/{catalog}/{db}/raw_data" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exist, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")Para gerar o conjunto de dados usado no tutorial, digite Shift + Enter para executar o código:
Opcional. Para visualizar os dados usados neste tutorial, insira o código a seguir na próxima célula e execute o código. Atualize o catálogo e o esquema para corresponder ao caminho do código anterior.
# Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = "<my_schema>" display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
Isso gera um grande conjunto de dados (com dados CDC falsos) que você pode usar no resto do tutorial. Na próxima etapa, ingira os dados usando o Auto Loader.
Etapa 3: Ingerir dados incrementalmente com o Auto Loader
O próximo passo é ingerir os dados brutos do armazenamento em nuvem (falsificado) em uma camada de bronze.
Isso pode ser desafiador por vários motivos, pois você deve:
- Operar em larga escala, potencialmente processando milhões de pequenos ficheiros.
- Inferir esquema e tipo JSON.
- Lide com registos inadequados tendo um esquema JSON incorreto.
- Tenha em consideração a evolução do esquema (por exemplo, uma nova coluna na tabela de clientes).
O Auto Loader simplifica essa ingestão, incluindo inferência de esquema e evolução de esquema, enquanto dimensiona para milhões de arquivos recebidos. Auto Loader está disponível em Python usando cloudFiles e em SQL usando o SELECT * FROM STREAM read_files(...) e pode ser usado com uma variedade de formatos (JSON, CSV, Apache Avro, etc.):
Definir a tabela como uma tabela de streaming garante que você consuma apenas novos dados de entrada. Se você não defini-lo como uma tabela de streaming, ele verifica e ingere todos os dados disponíveis. Consulte Tabelas de streaming para obter mais informações.
Para ingerir os dados CDC de entrada usando o Auto Loader, copie e cole o código a seguir no arquivo de código que foi criado com o seu pipeline (chamado
my_transformation.py). Você pode usar Python ou SQL, com base na linguagem escolhida ao criar o pipeline. Certifique-se de substituir o<catalog>e<schema>pelos que o utilizador configurou como padrão no pipeline.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # Replace with the catalog and schema name that # you are using: path = "/Volumes/<catalog>/<schema>/raw_data/customers" # Create the target bronze table dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @dp.append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(f"{path}") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( -- replace with the catalog/schema you are using: "/Volumes/<catalog>/<schema>/raw_data/customers", format => "json", inferColumnTypes => "true" )Clique no
Executar arquivo ou Executar pipeline para iniciar uma atualização para o pipeline conectado. Com apenas um arquivo de origem em seu pipeline, eles são funcionalmente equivalentes.
Quando a atualização for concluída, o editor será atualizado com informações sobre o seu pipeline.
- O gráfico de pipeline (DAG), na barra lateral à direita do seu código, mostra uma única tabela,
customers_cdc_bronze. - Um resumo da atualização é mostrado na parte superior do navegador de ativos de pipeline.
- Os detalhes da tabela que foi gerada são mostrados no painel inferior e você pode procurar dados da tabela selecionando-a.
Estes são os dados brutos da camada bronze importados do armazenamento em nuvem. Na próxima etapa, limpe os dados para criar uma tabela de camada prateada.
Etapa 4: Limpeza e expectativas para acompanhar a qualidade dos dados
Depois que a camada de bronze for definida, crie a camada de prata adicionando expectativas para controlar a qualidade dos dados. Verifique as seguintes condições:
- ID nunca deve ser
null. - O tipo de operação CDC deve ser válido.
- JSON deve ser lido corretamente pelo Auto Loader.
As linhas que não satisfazem estas condições são descartadas.
Consulte Gerenciar a qualidade dos dados com as expectativas do pipeline para obter mais informações.
Na barra lateral do navegador de ativos de pipeline, clique no
Adicionar e, em seguida, Transformação.
Digite um Nome e escolha uma linguagem (Python ou SQL) para o arquivo de código-fonte. Você pode misturar idiomas dentro de um pipeline, podendo escolher um dos idiomas nesta etapa.
Para criar uma camada prateada com uma tabela limpa e impor restrições, copie e cole o código a seguir no novo arquivo (escolha Python ou SQL com base na linguagem do arquivo).
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @dp.append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( spark.readStream.table("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;Clique no
Executar arquivo ou Executar pipeline para iniciar uma atualização para o pipeline conectado.
Porque agora existem dois arquivos de origem, estes não fazem a mesma coisa, mas neste caso, a saída é a mesma.
- Executar pipeline executa toda a pipeline, incluindo o código da etapa 3. Se os seus dados de entrada estivessem sendo atualizados, isso integraria quaisquer alterações dessa fonte na sua camada de bronze. Isso não executa o código da etapa de configuração de dados, porque ele está na pasta de explorações e não faz parte da fonte do seu pipeline.
- Executar arquivo executa apenas o arquivo de origem atual. Nesse caso, sem que os seus dados de entrada sejam atualizados, isso gera os dados de prata a partir da tabela de bronze em cache. Seria útil executar apenas este arquivo para iteração mais rápida ao criar ou editar seu código de pipeline.
Quando a atualização for concluída, você poderá ver que o gráfico de pipeline agora mostra duas tabelas (com a camada prata dependendo da camada bronze) e o painel inferior mostra detalhes para ambas as tabelas. A parte superior do navegador de ativos de pipeline agora mostra as durações de várias execuções, mas apenas os detalhes da execução mais recente.
Em seguida, crie sua versão final da camada de ouro da customers tabela.
Etapa 5: Materializar a tabela de clientes com um fluxo AUTO CDC
Até este ponto, as tabelas apenas transmitiram os dados do CDC em cada etapa. Agora, crie a tabela customers para conter a visão mais atualizada e para ser uma réplica da tabela original, não a lista de operações CDC que a criaram.
Isso não é trivial para implementar manualmente. Você deve considerar coisas como a desduplicação de dados para manter a linha mais recente.
No entanto, Lakeflow Spark Declarative Pipelines resolve esses desafios com a operação AUTO CDC.
Na barra lateral do navegador de ativos de pipeline, clique no
Adicionar e Transformação.
Insira um Nome e escolha uma linguagem (Python ou SQL) para o novo arquivo de código-fonte. Você pode escolher novamente qualquer um dos idiomas para esta etapa, mas use o código correto, abaixo.
Para processar os dados CDC usando
AUTO CDCno Lakeflow Spark Declarative Pipelines, copie e cole o código a seguir no novo arquivo.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table(name="customers", comment="Clean, materialized customers") dp.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;Clique no
Execute o arquivo para iniciar uma atualização para o pipeline conectado.
Quando a atualização estiver concluída, poderá ver que o seu gráfico de pipeline mostra três tabelas, progredindo de bronze para prata para ouro.
Etapa 6: Acompanhe o histórico de atualizações com o tipo de dimensão 2 (SCD2) que muda lentamente
Muitas vezes, é necessário criar uma tabela que acompanhe todas as alterações resultantes de APPEND, UPDATEe DELETE:
- Histórico: Você deseja manter um histórico de todas as alterações na sua tabela.
- Rastreabilidade: você deseja ver qual operação ocorreu.
SCD2 com Lakeflow SDP
O Delta suporta fluxo de dados de alteração (CDF) e table_change pode consultar modificações de tabela em SQL e Python. No entanto, o principal caso de uso do CDF é capturar alterações num pipeline, não criar uma visão abrangente das alterações em tabelas desde o início.
As coisas ficam especialmente complexas de implementar se você tiver eventos fora de ordem. Se você precisar sequenciar suas alterações por um carimbo de data/hora e receber uma modificação que aconteceu no passado, deverá acrescentar uma nova entrada na tabela SCD e atualizar as entradas anteriores.
O Lakeflow SDP remove essa complexidade e permite criar uma tabela separada que contém todas as modificações desde o início do tempo. Esta tabela pode então ser usada em escala, com partições específicas ou colunas ZORDER, se necessário. Os campos fora de sequência são geridos automaticamente com base no _sequence_by.
Para criar uma tabela SCD2, use a opção STORED AS SCD TYPE 2 em SQL ou stored_as_scd_type="2" em Python.
Observação
Você também pode limitar quais colunas o recurso rastreia usando a opção: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
Na barra lateral do navegador de ativos de pipeline, clique no
Adicionar e Transformação.
Insira um Nome e escolha uma linguagem (Python ou SQL) para o novo arquivo de código-fonte.
Copie e cole o código a seguir no novo arquivo.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # create the table dp.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dp.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updatesSQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW customers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;Clique no
Execute o arquivo para iniciar uma atualização para o pipeline conectado.
Quando a atualização estiver concluída, o gráfico de pipeline inclui a nova tabela customers_history, que também depende da tabela da camada prata, e o painel inferior mostra os detalhes de todas as 4 tabelas.
Etapa 7: Crie uma exibição materializada que rastreie quem mais alterou suas informações
A tabela customers_history contém todas as alterações históricas que um usuário fez em suas informações. Crie uma visão materializada simples na camada de ouro que mantém o controle de quem mais mudou suas informações. Isso pode ser usado para análise de deteção de fraudes ou recomendações do usuário em um cenário do mundo real. Além disso, a aplicação de alterações com SCD2 já removeu duplicatas, para que você possa contar diretamente as linhas por ID de usuário.
Na barra lateral do navegador de ativos de pipeline, clique no
Adicionar e Transformação.
Insira um Nome e escolha uma linguagem (Python ou SQL) para o novo arquivo de código-fonte.
Copie e cole o código a seguir no novo arquivo de origem.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * @dp.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( spark.read.table("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY idClique no
Execute o arquivo para iniciar uma atualização para o pipeline conectado.
Após a conclusão da atualização, há uma nova tabela no gráfico de pipeline que depende da customers_history tabela e você pode visualizá-la no painel inferior. O seu pipeline foi concluído. Você pode testá-lo executando um Run pipeline completo. As únicas etapas restantes são agendar o pipeline para atualizar regularmente.
Etapa 8: Criar um trabalho para executar o pipeline de ETL
Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão, processamento e análise de dados em seu pipeline usando um trabalho Databricks.
- Na parte superior do editor, escolha o botão Agendar .
- Se a caixa de diálogo Agendas for exibida, escolha Adicionar agenda.
- Isso abre a caixa de diálogo Nova agenda, onde pode criar um trabalho para executar o pipeline num cronograma.
- Opcionalmente, dê um nome ao trabalho.
- Por padrão, a programação é definida para ser executada uma vez por dia. Você pode aceitar esse padrão ou definir sua própria agenda. Escolher Avançado dá-lhe a opção de definir uma hora específica em que o trabalho será executado. Selecionar Mais opções permite criar notificações quando o trabalho é executado.
- Selecione Criar para aplicar as alterações e criar o trabalho.
Agora, a tarefa será executada diariamente para manter o seu pipeline atualizado. Você pode escolher Agendar novamente para exibir a lista de agendas. Você pode gerenciar agendas para seu pipeline a partir dessa caixa de diálogo, incluindo adicionar, editar ou remover agendas.
Clicar no nome da agenda (ou trabalho) leva você à página do trabalho na lista Jobs & pipelines . A partir daí, você pode visualizar detalhes sobre execuções de trabalho, incluindo o histórico de execuções, ou executar o trabalho imediatamente com o botão Executar agora .
Consulte Monitorização e observabilidade de tarefas do Lakeflow para obter mais informações sobre execuções de tarefas.
Recursos adicionais
- Oleodutos declarativos Lakeflow Spark
- Tutorial: Construir um pipeline ETL com Lakeflow Spark Declarative Pipelines
- O que é captura de dados de mudança (CDC)?
- AUTO CDC APIs: Simplifique a captura de dados de mudanças através de pipelines
- Converter uma pipeline num projeto Databricks Asset Bundle
- O que é Auto Loader?