Compartilhar via


Executar Pipelines Declarativos do Lakeflow dentro de um fluxo de trabalho

Você pode executar Pipelines Declarativas do Lakeflow como parte de um fluxo de trabalho de processamento de dados com Lakeflow Jobs, Apache Airflow ou Azure Data Factory.

Trabalhos

Você pode orquestrar várias tarefas em um trabalho do Databricks para implementar um fluxo de trabalho de processamento de dados. Para incluir um pipeline em um trabalho, use a tarefa Pipeline ao criar um trabalho. Consulte a tarefa Pipeline para trabalhos.

Fluxo de ar do Apache

O Apache Airflow é uma solução de software livre para gerenciar e agendar fluxos de trabalho de dados. O Airflow representa fluxos de trabalho como grafos direcionados e acíclicos (DAGs) de operações. Você define um fluxo de trabalho em um arquivo Python e o Airflow gerencia o agendamento e a execução. Para obter informações sobre como instalar e usar o Airflow com o Azure Databricks, consulte Orchestrate Lakeflow Jobs com o Apache Airflow.

Para executar um pipeline como parte de um fluxo de trabalho do Airflow, use o DatabricksSubmitRunOperator.

Requisitos

A seguir, é necessário usar o suporte de fluxo de ar para Pipelines Declarativos do Lakeflow:

Exemplo

O exemplo a seguir cria um DAG do Airflow que aciona uma atualização para o pipeline com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Substitua CONNECTION_ID pelo identificador de uma Conexão do Airflow com seu workspace.

Salve este exemplo no airflow/dags diretório e use a interface do Airflow para exibir e disparar o DAG. Use a interface do usuário do Lakeflow Declarative Pipelines para visualizar os detalhes da atualização do pipeline.

Fábrica de dados do Azure

Observação

Os Pipelines Declarativos do Lakeflow e o Azure Data Factory incluem opções para configurar o número de novas tentativas quando ocorre uma falha. Se os valores de repetição estiverem configurados no pipeline e na atividade do Azure Data Factory que chama o pipeline, o número de repetições será o valor de repetição do Azure Data Factory multiplicado pelo valor de repetição dos Pipelines Declarativos do Lakeflow.

Por exemplo, se uma atualização de pipeline falhar, o Lakeflow Declarative Pipelines repetirá a atualização até cinco vezes por padrão. Se a tentativa de repetição do Azure Data Factory estiver definida como três e o seu pipeline usar o padrão de cinco tentativas, o pipeline com falha pode ser tentado novamente até quinze vezes. Para evitar tentativas excessivas de repetição quando as atualizações de pipeline falharem, o Databricks recomenda limitar o número de tentativas ao configurar o pipeline ou a atividade do Azure Data Factory que chama o pipeline.

Para alterar a configuração de repetição do pipeline, use a configuração pipelines.numUpdateRetryAttempts ao configurá-lo.

O Azure Data Factory é um serviço ETL baseado em nuvem que permite orquestrar fluxos de trabalho de integração e transformação de dados. O Azure Data Factory dá suporte diretamente à execução de tarefas do Azure Databricks em um fluxo de trabalho, incluindo notebooks, tarefas JAR e scripts Python. Você também pode incluir um pipeline em um fluxo de trabalho chamando a API de Pipelines Declarativos do Lakeflow de uma atividade Web do Azure Data Factory. Por exemplo, para disparar uma atualização de pipeline do Azure Data Factory:

  1. Crie um data factory ou abra um data factory existente.

  2. Quando a criação for concluída, abra a página da fábrica de dados e clique no bloco Abrir o Azure Data Factory Studio. A interface do usuário do Azure Data Factory é exibida.

  3. Crie um pipeline do Azure Data Factory selecionando Pipeline no menu suspenso Novo na interface do usuário do Azure Data Factory Studio.

  4. Na caixa de ferramentas Atividades, expanda Geral e arraste a atividade da Web para a tela do pipeline. Clique na guia Configurações e insira os seguintes valores:

    Observação

    Como prática recomendada de segurança, quando você se autentica com ferramentas automatizadas, sistemas, scripts e aplicativos, o Databricks recomenda que você use tokens de acesso pessoal pertencentes a entidades de serviço em vez de usuários do workspace. Para criar tokens para entidades de serviço, consulte Gerenciar tokens para uma entidade de serviço.

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Substitua <get-workspace-instance>.

      Substitua <pipeline-id> pelo identificador de pipeline.

    • Método: selecione POST no menu suspenso.

    • Cabeçalhos: clique em + Novo. Na caixa de texto Nome , insira Authorization. Na caixa de texto Valor , insira Bearer <personal-access-token>.

      Substitua <personal-access-token> por um token de acesso pessoal do Azure Databricks.

    • Corpo: para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para iniciar uma atualização e reprocessar todos os dados da pipeline: {"full_refresh": "true"}. Se não houver parâmetros de solicitação adicionais, insira chaves vazias ({}).

Para testar a atividade Web, clique em Depurar na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da execução, incluindo erros, são exibidos na guia Saída do pipeline do Azure Data Factory. Use a interface do usuário do Lakeflow Declarative Pipelines para visualizar os detalhes da atualização do pipeline.

Dica

Um requisito comum de fluxo de trabalho é iniciar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação de Pipelines Declarativos do Lakeflow é assíncrona updates — a solicitação retorna após começar a atualização, mas antes que ela seja concluída — as tarefas no pipeline do Azure Data Factory, que dependem da atualização dos Pipelines Declarativos do Lakeflow, devem aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Until após a atividade da Web que dispara a atualização dos Pipelines Declarativos do Lakeflow. Na atividade Until:

  1. Adicione uma atividade de espera para aguardar um número configurado de segundos para a conclusão da atualização.
  2. Adicione uma atividade Web após a atividade de espera que usa a solicitação de atualização de detalhes dos Lakeflow Declarative Pipelines para verificar o status da atualização. O state campo na resposta retorna o estado atual da atualização, inclusive se ela tiver sido concluída.
  3. Use o valor do state campo para definir a condição de término da atividade Until. Você também pode usar uma atividade 'Definir Variável' para adicionar uma variável de pipeline com base no valor de state e usar essa variável para a condição de término.