Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Saiba como criar e implantar um pipeline de ETL (extrair, transformar e carregar) com CDC (captura de dados de alteração) usando o Lakeflow Spark Declarative Pipelines (SDP) para orquestração de dados e Carregador Automático. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.
Neste tutorial, você usará dados de uma customers tabela em um banco de dados MySQL para:
- Extraia as alterações de um banco de dados transacional usando o Debezium ou outra ferramenta e salve-as no armazenamento de objetos de nuvem (S3, ADLS ou GCS). Neste tutorial, você ignora a configuração de um sistema CDC externo e, em vez disso, gera dados falsos para simplificar o tutorial.
- Use o Carregador Automático para carregar incrementalmente as mensagens do armazenamento de objetos de nuvem e armazenar as mensagens brutas na
customers_cdctabela. O Carregador Automático infere o esquema e manipula a evolução do esquema. - Crie a
customers_cdc_cleantabela para verificar a qualidade dos dados usando as expectativas. Por exemplo, oidnunca deve sernullporque ele é usado para executar operações upsert. - Realize
AUTO CDC ... INTOnos dados CDC limpos para fazer alterações upsert na tabelacustomersfinal. - Mostrar como um pipeline pode criar uma tabela SCD2 (dimensão de alteração lenta) do tipo 2 para acompanhar todas as alterações.
O objetivo é ingerir os dados brutos quase em tempo real e criar uma tabela para sua equipe de analistas, garantindo a qualidade dos dados.
O tutorial utiliza a arquitetura Lakehouse de medalhão, onde ingere dados brutos através da camada bronze, limpa e valida dados com a camada prata e aplica modelagem dimensional e agregação usando a camada ouro. Veja O que é a arquitetura medalhão lakehouse? Para obter mais informações.
O fluxo implementado tem esta aparência:
Para obter mais informações sobre pipeline, Carregador Automático e CDC, consulte Pipelines Declarativos do Lakeflow Spark, O que é o Carregador Automático? e O que é captura de dados de alteração (CDC)?
Requirements
Para concluir este tutorial, você deve atender aos seguintes requisitos:
- Entre em um workspace do Azure Databricks.
- Habilite o Catálogo do Unity para seu workspace.
- Tenha a computação sem servidor habilitada para sua conta. Pipelines Declarativas do Spark sem servidor Lakeflow não estão disponíveis em todas as regiões de workspace. Veja Recursos com disponibilidade regional limitada para saber as regiões disponíveis. Se a computação sem servidor não estiver habilitada para sua conta, as etapas deverão funcionar com a computação padrão para seu workspace.
- Tenha permissão para criar um recurso de computação ou acesso a um recurso de computação.
- Tenha permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGESouUSE CATALOG.CREATE SCHEMA - Tenha permissões para criar um novo volume em um esquema existente. As permissões necessárias são
ALL PRIVILEGESouUSE SCHEMA.CREATE VOLUME
Alterar a captura de dados em um pipeline de ETL
A CDC (captura de dados de alteração) é o processo que captura alterações nos registros feitas em um banco de dados transacional (por exemplo, MySQL ou PostgreSQL) ou em um data warehouse. O CDC captura operações como exclusões de dados, acréscimos e atualizações, normalmente como um fluxo para materializar novamente tabelas em sistemas externos. O CDC habilita o carregamento incremental, eliminando a necessidade de atualizações de cargas em massa.
Observação
Para simplificar este tutorial, ignore a configuração de um sistema CDC externo. Suponha que esteja executando e salvando dados CDC como arquivos JSON no armazenamento de objetos de nuvem (S3, ADLS ou GCS). Este tutorial usa a Faker biblioteca para gerar os dados usados no tutorial.
Capturando CDC
Uma variedade de ferramentas CDC encontra-se disponível. Uma das principais soluções de software livre é o Debezium, mas existem outras implementações que simplificam as fontes de dados, como Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate e AWS DMS.
Neste tutorial, você usará dados CDC de um sistema externo como Debezium ou DMS. O Debezium captura todas as linhas que foram alteradas. Normalmente, ele envia o histórico de alterações de dados para tópicos do Kafka ou os salva como arquivos.
Você deve ingerir as informações de CDC da tabela customers (em formato JSON), verificar se estão corretas e então gerar a tabela de clientes no Lakehouse.
Entrada CDC do Debezium
Para cada alteração, você recebe uma mensagem JSON contendo todos os campos da linha que está sendo atualizada (id, , firstname, lastname, email, ). address A mensagem também inclui metadados adicionais:
-
operation: um código de operação, normalmente (DELETE, ,APPEND).UPDATE -
operation_date: a data e o carimbo de data/hora do registro de cada ação de operação.
Ferramentas como o Debezium podem produzir uma saída mais avançada, como o valor da linha antes da alteração, mas este tutorial as omite para simplificar.
Etapa 1: Criar um pipeline
Crie um novo pipeline de ETL para consultar sua fonte de dados CDC e gerar tabelas em seu espaço de trabalho.
No seu espaço de trabalho, clique no
Novo localizado no canto superior esquerdo.ícone de Adicionar. Clique em ETL Pipeline.
Altere o título do pipeline para
Pipelines with CDC tutorialou um nome que você preferir.Sob o título, escolha um catálogo e um esquema para o qual você tem permissões de escrita.
Esse catálogo e esquema são usados por padrão, se você não especificar um catálogo ou esquema em seu código. Seu código pode escrever em qualquer catálogo ou esquema ao especificar o caminho completo. Este tutorial usa os padrões especificados aqui.
Nas opções Avançadas, selecione Iniciar com um arquivo vazio.
Escolha uma pasta para seu código. Você pode selecionar Procurar para navegar na lista de pastas no workspace. Você pode escolher qualquer pasta para a qual tenha permissões de gravação.
Para usar o controle de versão, selecione uma pasta git. Se você precisar criar uma nova pasta, selecione o
Escolha Python ou SQL para o idioma do arquivo, com base no idioma que você deseja usar para o tutorial.
Clique em Selecionar para criar o pipeline com essas configurações e abrir o Editor do Lakeflow Pipelines.
Agora, você tem um pipeline em branco, com um catálogo e esquema padrão. Em seguida, configure os dados de exemplo a serem importados no tutorial.
Etapa 2: Criar os dados de exemplo a serem importados neste tutorial
Essa etapa não será necessária se você estiver importando seus próprios dados de uma fonte existente. Para este tutorial, gere dados falsos como exemplo para o tutorial. Crie um notebook para executar o script de geração de dados do Python. Esse código só precisa ser executado uma vez para gerar os dados de exemplo, portanto, crie-os dentro da pasta do explorations pipeline, que não é executada como parte de uma atualização de pipeline.
Observação
Esse código usa o Faker para gerar os dados CDC de exemplo. O Faker está disponível para instalação automática, portanto, o tutorial usa %pip install faker. Você também pode definir uma dependência do faker para o bloco de anotações. Consulte Adicionar dependências ao notebook.
No Editor do Lakeflow Pipelines, na barra lateral do navegador de recursos localizada à esquerda, clique no
Adicionar, e escolha Exploração.
Dê a ele um nome, como
Setup data, selecione Python. Você pode deixar a pasta de destino padrão, que é uma novaexplorationspasta.Clique em Criar. Isso cria um bloco de anotações na nova pasta.
Insira o código a seguir na primeira célula. Você deve alterar a definição de
<my_catalog>e<my_schema>para corresponder ao catálogo e esquema padrão que você selecionou no procedimento anterior.%pip install faker # Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = dbName = db = "<my_schema>" spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`') volume_folder = f"/Volumes/{catalog}/{db}/raw_data" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exist, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")Para gerar o conjunto de dados usado no tutorial, digite Shift + Enter para executar o código:
Optional. Para visualizar os dados usados neste tutorial, insira o código a seguir na próxima célula e execute o código. Atualize o catálogo e o esquema para corresponder ao caminho do código anterior.
# Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = "<my_schema>" display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
Isso gera um conjunto de dados grande (com dados CDC falsos) que você pode usar no restante do tutorial. Na próxima etapa, ingerir os dados usando o Carregador Automático.
Etapa 3: ingerir dados incrementalmente com o Carregador Automático
A próxima etapa é carregar os dados brutos do armazenamento em nuvem (falso) em uma camada bronze.
Isso pode ser desafiador por vários motivos, pois é necessário:
- Opere em escala, potencialmente ingerindo milhões de arquivos pequenos.
- Inferir o esquema e o tipo JSON.
- Lide com registros incorretos com esquema JSON errado.
- Cuide da evolução da estrutura (por exemplo, uma nova coluna na tabela de clientes).
O Carregador Automático simplifica essa ingestão, incluindo inferência de esquema e evolução do esquema, ao mesmo tempo em que é dimensionado para milhões de arquivos de entrada. O Carregador Automático está disponível no Python usando cloudFiles e no SQL usando o SELECT * FROM STREAM read_files(...) e pode ser usado com uma variedade de formatos (JSON, CSV, Apache Avro etc.):
Definir a tabela como uma tabela de streaming garante que você consuma apenas novos dados de entrada. Se você não defini-la como uma tabela de streaming, ela examinará e ingerirá todos os dados disponíveis. Consulte as tabelas de streaming para obter mais informações.
Para ingerir os dados CDC de entrada usando o Auto Loader, copie e cole o seguinte código no arquivo de código que foi criado com sua pipeline (chamado
my_transformation.py). Você pode usar Python ou SQL, com base no idioma escolhido ao criar o pipeline. Certifique-se de substituir o<catalog>e o<schema>pelos que você configurou como padrão para o pipeline.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # Replace with the catalog and schema name that # you are using: path = "/Volumes/<catalog>/<schema>/raw_data/customers" # Create the target bronze table dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @dp.append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(f"{path}") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( -- replace with the catalog/schema you are using: "/Volumes/<catalog>/<schema>/raw_data/customers", format => "json", inferColumnTypes => "true" )Clique no
Execute o arquivo ou execute o pipeline para iniciar uma atualização para o pipeline conectado. Com apenas um arquivo de origem em seu pipeline, eles são funcionalmente equivalentes.
Quando a atualização for concluída, o editor será atualizado com informações sobre seu pipeline.
- O DAG (pipeline graph), na barra lateral à direita do código, mostra uma única tabela.
customers_cdc_bronze - Um resumo da atualização é mostrado na parte superior do navegador de ativos de pipeline.
- Os detalhes da tabela gerada são mostrados no painel inferior e você pode procurar dados da tabela selecionando-os.
Esses são os dados brutos da camada bronze importados do armazenamento em nuvem. Na próxima etapa, limpe os dados para criar uma tabela de camadas de prata.
Etapa 4: Limpeza e expectativas para acompanhar a qualidade dos dados
Depois que a camada bronze for definida, crie a camada de prata adicionando expectativas para controlar a qualidade dos dados. Verifique as seguintes condições:
- A ID nunca deve ser
null. - O tipo de operação CDC deve ser válido.
- O JSON deve ser lido corretamente pelo Carregador Automático.
As linhas que não atendem a essas condições são descartadas.
Consulte Gerenciar a qualidade dos dados com expectativas de pipeline para obter mais informações.
Na barra lateral do navegador de ativos de pipeline, clique no
Adicione e, em seguida, Transformação.
Insira um Nome e escolha um idioma (Python ou SQL) para o arquivo de código-fonte. Você pode combinar idiomas dentro de um pipeline, podendo assim escolher qualquer um dos dois para esta etapa.
Para criar uma camada de prata com uma tabela limpa e impor restrições, copie e cole o código a seguir no novo arquivo (escolha Python ou SQL com base no idioma do arquivo).
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @dp.append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( spark.readStream.table("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;Clique no
Execute o arquivo ou execute o pipeline para iniciar uma atualização para o pipeline conectado.
Como agora há dois arquivos de origem, eles não fazem a mesma coisa, mas, nesse caso, a saída é a mesma.
- Execute o pipeline executa todo o pipeline, incluindo o código da etapa 3. Se seus dados de entrada estivessem sendo atualizados, isso capturaria todas as alterações dessa fonte de dados para a camada de bronze. Isso não executa o código da etapa de configuração de dados, pois isso está na pasta explorações e não faz parte da origem do pipeline.
- Executar o arquivo executa apenas o arquivo de origem atual. Nesse caso, sem que os dados de entrada sejam atualizados, isso gera os dados prata da tabela bronze armazenada em cache. Seria útil executar apenas esse arquivo para iteração mais rápida ao criar ou editar seu código de pipeline.
Quando a atualização for concluída, você poderá ver que o gráfico de pipeline agora mostra duas tabelas (com a camada de prata dependendo da camada bronze) e o painel inferior mostra detalhes para ambas as tabelas. O topo do navegador de ativos de pipeline agora mostra tempos de execução de várias execuções, mas apenas detalhes para a execução mais recente.
Em seguida, crie a versão final da camada de ouro da customers tabela.
Etapa 5: Materializar a tabela de clientes com um fluxo AUTO CDC
Até este ponto, as tabelas vinham apenas repassando os dados do CDC a cada etapa. Agora, crie a customers tabela que contenha a vista mais atualizada e seja uma réplica da tabela original, e não a lista de operações CDC que a criaram.
Isso não é trivial para implementar manualmente. Você deve considerar aspectos como a eliminação de duplicação de dados para manter a linha mais recente.
No entanto, o Lakeflow Spark Declarative Pipelines resolve esses desafios com a AUTO CDC operação.
Na barra lateral do navegador de ativos de pipeline, clique no
Adicionar e transformação.
Insira um Nome e escolha um idioma (Python ou SQL) para o novo arquivo de código-fonte. Você pode escolher novamente qualquer idioma para esta etapa, mas usar o código correto, abaixo.
Para processar os dados CDC usando
AUTO CDCno Lakeflow Spark Declarative Pipelines, copie e cole o código a seguir no novo arquivo.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table(name="customers", comment="Clean, materialized customers") dp.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;Clique no
Execute o arquivo para iniciar uma atualização para o pipeline conectado.
Quando a atualização for concluída, você poderá ver que o grafo de pipeline mostra três tabelas, progredindo de bronze para prata para ouro.
Etapa 6: Acompanhar o histórico de atualizações com dimensão de alteração lenta tipo 2 (SCD2)
Geralmente, é necessário criar uma tabela acompanhando todas as alterações resultantes de APPEND, UPDATEe DELETE:
- Histórico: você deseja manter um histórico de todas as alterações na tabela.
- Rastreabilidade: você deseja ver qual operação ocorreu.
SCD2 com o SDP do Lakeflow
O Delta dá suporte a CDF (fluxo de dados de alteração) e table_change pode consultar modificações de tabela no SQL e no Python. No entanto, o principal caso de uso do CDF é capturar alterações em um pipeline, e não criar uma visão completa das alterações na tabela desde o início.
As coisas ficam especialmente complexas de implementar se você tiver eventos fora de ordem. Se você precisar sequenciar suas alterações por um carimbo de data/hora e receber uma modificação que aconteceu no passado, será necessário acrescentar uma nova entrada na tabela SCD e atualizar as entradas anteriores.
O Lakeflow SDP remove essa complexidade e permite que você crie uma tabela separada que contenha todas as modificações desde o início do tempo. Essa tabela pode ser usada em escala, com partições específicas ou colunas ZORDER, se necessário. Os campos fora de ordem são tratados automaticamente com base no _sequence_by.
Para criar uma tabela SCD2, use a opção STORED AS SCD TYPE 2 no SQL ou stored_as_scd_type="2" no Python.
Observação
Você também pode limitar quais colunas o recurso rastreia usando a opção: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
Na barra lateral do navegador de ativos de pipeline, clique no
Adicionar e transformação.
Insira um Nome e escolha um idioma (Python ou SQL) para o novo arquivo de código-fonte.
Copie e cole o código a seguir no novo arquivo.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # create the table dp.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dp.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updatesSQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW customers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;Clique no
Execute o arquivo para iniciar uma atualização para o pipeline conectado.
Quando a atualização for concluída, o gráfico de pipeline inclui a nova tabela customers_history, dependente também da tabela de camada prata, e o painel inferior mostra os detalhes de todas as 4 tabelas.
Etapa 7: Criar uma exibição materializada que acompanha mais quem alterou suas informações
A tabela customers_history contém todas as alterações históricas que um usuário fez às suas informações. Crie uma visão materializada simples na camada de ouro que acompanhe quem mais alterou suas informações. Isso pode ser usado para análise de detecção de fraudes ou recomendações de usuário em um cenário real. Além disso, a aplicação de alterações com SCD2 já removeu duplicatas, para que você possa contar diretamente as linhas por ID de usuário.
Na barra lateral do navegador de ativos de pipeline, clique no
Adicionar e transformação.
Insira um Nome e escolha um idioma (Python ou SQL) para o novo arquivo de código-fonte.
Copie e cole o código a seguir no novo arquivo de origem.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * @dp.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( spark.read.table("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY idClique no
Execute o arquivo para iniciar uma atualização para o pipeline conectado.
Após a conclusão da atualização, há uma nova tabela no gráfico de pipeline que depende da tabela customers_history, e pode ser visualizada no painel inferior. O seu pipeline está agora concluído. Você pode testá-lo executando um pipeline de execução completo. As únicas etapas restantes são agendar o pipeline para que ele seja atualizado regularmente.
Etapa 8: Criar um trabalho para executar o pipeline de ETL
Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão, processamento e análise de dados em seu pipeline usando um trabalho do Databricks.
- Na parte superior do editor, escolha o botão Agendar .
- Se a caixa de diálogo Agendas for exibida, escolha Adicionar agendamento.
- Isso abre a caixa de diálogo Nova agenda, na qual você pode criar uma tarefa para executar o pipeline em um horário programado.
- Opcionalmente, dê um nome ao trabalho.
- Por padrão, o agendamento é definido para ser executado uma vez por dia. Você pode aceitar esse padrão ou definir sua própria agenda. Escolher Avançado oferece a opção de definir uma hora específica em que o trabalho será executado. Selecionar Mais opções permite criar notificações quando o trabalho é executado.
- Selecione Criar para aplicar as alterações e criar o trabalho.
Agora, a tarefa será executada diariamente para manter seu pipeline sempre atualizado. Você pode escolher Agendar novamente para exibir a lista de agendas. Você pode gerenciar agendas para o pipeline a partir dessa caixa de diálogo, incluindo adicionar, editar ou remover agendas.
Clicar no nome do cronograma (ou da tarefa) leva você para a página da tarefa na lista Tarefas e pipelines. A partir daí, você pode exibir detalhes sobre execuções de trabalho, incluindo o histórico de execuções ou executar o trabalho imediatamente com o botão Executar agora .
Consulte Monitoramento e observabilidade para Tarefas do Lakeflow para obter mais informações sobre execuções de tarefas.
Recursos adicionais
- Pipelines Declarativos Lakeflow Spark
- Tutorial: Criar um pipeline ETL com Pipelines Declarativos do Lakeflow Spark
- O que é a CDC (captura de dados de alteração)?
- As APIs AUTO CDC: simplificam a captura de dados de mudanças com pipelines
- Converter um pipeline em um projeto do Pacote de Ativos Databricks
- O que é o Carregador Automático?