Compartilhar via


Executar pipelines em um fluxo de trabalho

Você pode executar um pipeline como parte de um fluxo de trabalho de processamento de dados com Jobs do Lakeflow, Apache Airflow ou Azure Data Factory.

Jobs

Você pode orquestrar várias tarefas em um trabalho do Databricks para implementar um fluxo de trabalho de processamento de dados. Para incluir um pipeline em uma tarefa, utilize a tarefa Pipeline ao criar uma tarefa. Consulte Tarefa de pipeline para trabalhos.

Fluxo de ar do Apache

O Apache Airflow é uma solução de software livre para gerenciar e agendar fluxos de trabalho de dados. Airflow representa fluxos de trabalho como grafos acíclicos direcionados (DAGs) de operações. Você define um fluxo de trabalho em um arquivo Python e o Airflow gerencia o agendamento e a execução. Para obter informações sobre como instalar e usar o Airflow com o Azure Databricks, consulte Orchestrate Lakeflow Jobs com o Apache Airflow.

Para executar um pipeline como parte de um workflow do Airflow, use o DatabricksSubmitRunOperator.

Requirements

Para utilizar o suporte do Airflow para pipelines declarativos do Lakeflow Spark, os seguintes são necessários:

Example

O exemplo a seguir cria um DAG de Airflow que aciona uma atualização para o pipeline com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('ldp',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Substitua CONNECTION_ID pelo identificador de uma conexão Airflow no seu workspace.

Salve este exemplo no diretório airflow/dags e use a UI do Airflow para visualizar e acionar o DAG. Utilize a interface do usuário da linha de produção para ver os detalhes da atualização da linha de produção.

Azure Data Factory

Observação

O Lakeflow Spark Declarative Pipelines e o Azure Data Factory incluem opções para configurar o número de novas tentativas quando ocorre uma falha. Se os valores de repetição estiverem configurados no pipeline e na atividade do Azure Data Factory que chama o pipeline, o número de novas tentativas será o valor de repetição do Azure Data Factory multiplicado pelo valor de repetição do pipeline.

Por exemplo, se uma atualização de pipeline falhar, o Lakeflow Spark Declarative Pipelines repetirá a atualização até cinco vezes por padrão. Se o Azure Data Factory estiver configurado para três tentativas e seu pipeline usar o padrão de cinco tentativas, o pipeline com falha poderá ter até quinze novas tentativas. Para evitar tentativas excessivas de repetição quando as atualizações de pipeline falharem, o Databricks recomenda limitar o número de tentativas ao configurar o pipeline ou a atividade do Azure Data Factory que chama o pipeline.

Para alterar a configuração de repetição do pipeline, use pipelines.numUpdateRetryAttempts ao configurar o pipeline.

O Azure Data Factory é um serviço ETL baseado em nuvem que permite orquestrar fluxos de trabalho de integração e transformação de dados. O Azure Data Factory dá suporte diretamente à execução de tarefas do Azure Databricks em um fluxo de trabalho, incluindo notebooks, tarefas JAR e scripts Python. Você também pode incluir um pipeline em um fluxo de trabalho chamando a API REST do pipeline de uma atividade Web do Azure Data Factory. Por exemplo, para disparar uma atualização de pipeline do Azure Data Factory:

  1. Crie um data factory ou abra um data factory existente.

  2. Quando a criação for concluída, abra a página da sua fábrica de dados e clique no botão Abrir o Azure Data Factory Studio. A interface do usuário do Azure Data Factory é exibida.

  3. Crie um novo pipeline no Azure Data Factory selecionando Pipeline no menu suspenso Novo na interface do usuário do Azure Data Factory Studio.

  4. Na caixa de ferramentas de Atividades, expanda Geral e arraste a atividade Web para a tela do pipeline. Clique na guia Configurações e insira os seguintes valores:

    Observação

    Como prática recomendada de segurança, quando você se autentica com ferramentas automatizadas, sistemas, scripts e aplicativos, o Databricks recomenda que você use tokens de acesso pessoal pertencentes a entidades de serviço em vez de usuários do workspace. Para criar tokens para entidades de serviço, consulte Gerenciar tokens para uma entidade de serviço.

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Substitua <get-workspace-instance>.

      Substitua <pipeline-id> pelo identificador de pipeline.

    • Método: selecione POST no menu suspenso.

    • Cabeçalhos: Clique + Novo. Na caixa de texto Nome, insira Authorization. Na caixa de texto Valor , insira Bearer <personal-access-token>.

      Substitua <personal-access-token> por um token de acesso pessoal do Azure Databricks.

    • Corpo: para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para iniciar uma atualização e reprocessar todos os dados do pipeline: {"full_refresh": "true"}. Se não houver parâmetros de solicitação adicionais, insira chaves vazias ({}).

Para testar a atividade da Web, clique em Depurar na barra de ferramentas de pipeline na interface do Data Factory. A saída e o status da execução, incluindo erros, são exibidos na guia Saída do pipeline do Azure Data Factory. Use a interface do usuário de pipelines para exibir os detalhes da atualização do pipeline.

Dica

Um requisito comum de fluxo de trabalho é iniciar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação de pipeline updates é assíncrona — a solicitação retorna após iniciar a atualização, mas antes que a atualização seja concluída — as tarefas no pipeline do Azure Data Factory com uma dependência na atualização do pipeline devem aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Until seguindo a atividade Web que dispara a atualização dos Pipelines Declarativos do Lakeflow Spark. Na atividade 'Until':

  1. Adicione uma atividade de espera para aguardar um número configurado de segundos para a conclusão da atualização.
  2. Adicione uma atividade Web após a atividade Wait, que usa a solicitação de detalhes da atualização do pipeline para obter o status da atualização. O state campo na resposta retorna o estado atual da atualização, inclusive se ela tiver sido concluída.
  3. Use o valor do state campo para definir a condição de término da atividade Until. Você também pode usar uma atividade Definir Variável para adicionar uma variável de pipeline com base no valor de state e usar essa variável como condição de término.