Orquestrar trabalhos do Azure Databricks com o Apache Airflow

Este artigo descreve o suporte do Apache Airflow para orquestrar pipelines de dados com o Azure Databricks, tem instruções para instalar e configurar o Airflow localmente e fornece um exemplo de implantação e execução de um fluxo de trabalho do Azure Databricks com o Airflow.

Orquestração de trabalho em um pipeline de dados

O desenvolvimento e a implantação de um pipeline de processamento de dados geralmente exigem o gerenciamento de dependências complexas entre tarefas. Por exemplo, um pipeline pode ler dados de uma fonte, limpar os dados, transformar os dados limpos e escrever os dados transformados em um destino. Você também precisa de suporte para testar, agendar e solucionar problemas de erros ao operacionalizar um pipeline.

Os sistemas de fluxo de trabalho abordam esses desafios permitindo que você defina dependências entre tarefas, agende o horário de execução dos pipelines e monitore os fluxos de trabalho. O Apache Airflow é uma solução de código aberto usada para gerenciar e agendar pipelines de dados. O Airflow representa os pipelines de dados como DAGs (grafos direcionados acíclicos) de operações. Você define um fluxo de trabalho em um arquivo Python, e o Airflow gerencia o agendamento e a execução. A conexão entre o Airflow e o Azure Databricks permite que você aproveite o mecanismo do Spark otimizado oferecido pelo Azure Databricks com os recursos de agendamento do Airflow.

Requisitos

  • A integração entre o Airflow e o Azure Databricks está disponível no Airflow versão 2.5.0 e posterior. Os exemplos deste artigo são testados com o Airflow versão 2.6.1.
  • O Airflow exige Python 3.8, 3.9, 3.10 ou 3.11. Os exemplos deste artigo são testados com o Python 3.8.
  • As instruções neste artigo para instalar e executar o Airflow exigem pipenv para criar um ambiente virtual do Python.

Operadores do Airflow para Databricks

Um DAG do Airflow é composto de tarefas, em que cada tarefa executa um Operador do Airflow. Os operadores do Airflow que dão suporte à integração ao Databricks são implementados no provedor do Databricks.

O provedor do Databricks inclui operadores para executar várias tarefas em um workspace do Azure Databricks, incluindo importar dados para uma tabela, executar consultas SQL e trabalhar com pastas Git do Databricks.

O provedor do Databricks implementa dois operadores para disparar trabalhos:

Para criar um novo trabalho do Azure Databricks ou redefinir um trabalho existente, o provedor do Databricks implementa o DatabricksCreateJobsOperator. O DatabricksCreateJobsOperator usa as solicitações de API POST /api/2.1/jobs/create e POST /api/2.1/jobs/reset. Você pode usar o DatabricksCreateJobsOperator com o DatabricksRunNowOperator para criar e executar um trabalho.

Observação

Usar os operadores do Databricks para disparar um trabalho requer o fornecimento de credenciais na configuração de conexão do Databricks. Consulte Criar um token de acesso pessoal do Azure Databricks para o Airflow.

O operador do Databricks Airflow grava a URL da página de execução de trabalho nos logs do Airflow a cada polling_period_seconds (o padrão é de 30 segundos). Para obter mais informações, confira a página do pacote apache-airflow-providers-databricks no site do Airflow.

Instalar a integração entre o Airflow e o Azure Databricks

Para instalar o Airflow e o provedor do Databricks localmente para teste e desenvolvimento, use as etapas a seguir. Para outras opções de instalação do Airflow, incluindo a criação de uma instalação de produção, consulte a seção instalação na documentação do Airflow.

Abra um terminal e execute os seguintes comandos:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Substitua <firstname>, <lastname> e <email> por seu nome de usuário e email. Você será solicitado a inserir uma senha para o usuário administrador. Salve essa senha porque ela é necessária para entrar na interface do usuário do Airflow.

O script executa as seguintes etapas:

  1. Cria um diretório chamado airflow e muda para esse diretório.
  2. Usa pipenv para criar e gerar um ambiente virtual do Python. O Databricks recomenda usar um ambiente virtual do Python para isolar as versões de pacote e as dependências de código nesse ambiente. Esse isolamento ajuda a reduzir incompatibilidades inesperadas de versões do pacote e colisões de dependência de código.
  3. Inicializa uma variável de ambiente chamada AIRFLOW_HOME definida para o caminho do diretório airflow.
  4. Instala o Airflow e os pacotes de provedor do Databricks no Airflow.
  5. Cria um diretório airflow/dags. O Airflow usa o diretório dags para armazenar definições de DAG.
  6. Inicializa um banco de dados SQLite usado pelo Airflow para acompanhar os metadados. Em uma implantação de produção do Airflow, o Airflow é configurado com um banco de dados padrão. O banco de dados SQLite e a configuração padrão para a implantação do Airflow são inicializados no diretório airflow.
  7. Cria um usuário administrador para o Airflow.

Dica

Para confirmar a instalação do provedor do Databricks, execute o seguinte comando no diretório de instalação do Airflow:

airflow providers list

Iniciar o agendador e o servidor Web do Airflow

O servidor Web do Airflow é necessário para exibir a interface do usuário do Airflow. Para iniciar o servidor Web, abra um terminal no diretório de instalação do Airflow e execute os seguintes comandos:

Observação

Se o servidor Web do Airflow não for iniciado devido a um conflito de porta, você poderá alterar a porta padrão na configuração do Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

O agendador é o componente do Airflow que agenda os DAGs. Para iniciar o agendador, abra um novo terminal no diretório de instalação do Airflow e execute os seguintes comandos:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Testar a instalação do Airflow

Para verificar a instalação do Airflow, você pode executar um dos DAGs de exemplo incluídos no Airflow:

  1. Em uma janela do navegador, abra http://localhost:8080/home. Entre na interface do usuário do Airflow com o nome de usuário e a senha que você criou ao instalar o Airflow. A página DAGs do Airflow é exibida.
  2. Clique na alternância Pausar/Retomar DAG para retomar um dos DAGs de exemplo, como o example_python_operator.
  3. Dispare o DAG de exemplo clicando no botão Disparar DAG.
  4. Clique no nome do DAG para ver detalhes, incluindo o status de execução do DAG.

Criar um token de acesso pessoal do Azure Databricks para o Airflow

O Airflow se conecta ao Databricks usando um token de acesso pessoal (PAT) do Azure Databricks. Para criar um PAT:

  1. No workspace do Azure Databricks, clique no nome de usuário do Azure Databricks na barra superior e selecione Configurações na lista suspensa.
  2. Clique em Desenvolvedor.
  3. Ao lado de Tokens de acesso, clique em Gerenciar.
  4. Clique em Gerar novo token.
  5. (Opcional) Insira um comentário que ajude você a identificar esse token no futuro e altere o tempo de vida padrão do token de 90 dias. Para criar um token sem tempo de vida (não recomendado), deixe a caixa Tempo de vida (dias) vazia (em branco).
  6. Clique em Gerar.
  7. Copie o token exibido para um local seguro e clique em Concluído.

Observação

Lembre-se de salvar o token copiado em um local seguro. Não compartilhe seu token copiado com outras pessoas. Se você perder o token copiado, não poderá regenerar exatamente aquele mesmo token. Em vez disso, será necessário repetir esse procedimento para criar um novo token. Caso você tenha perdido o token copiado ou acredite que ele tenha sido comprometido, o Databricks recomenda que você exclua imediatamente esse token do seu workspace clicando no ícone de lixeira (Revogar) ao lado do token na página de Tokens de acesso.

Se você não conseguir criar ou usar tokens em seu workspace, isso pode ocorrer porque o administrador do workspace desabilitou tokens ou não deu permissão para criar ou usar tokens. Veja o administrador do workspace ou o seguinte:

Observação

Como melhor prática de segurança, ao autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, o Databricks recomenda que você use tokens de acesso pertencentes às entidades de serviço e não aos usuários do workspace. Para criar tokens para entidades de serviço, consulte Gerenciar tokens para uma entidade de serviço.

Você também pode se autenticar no Azure Databricks usando um token do Microsoft Entra ID (antigo Azure Active Directory). Consulte Conexão do Databricks na documentação do Airflow.

Configurar uma conexão do Azure Databricks

A instalação do Airflow contém uma conexão padrão com o Azure Databricks. Para atualizar a conexão e se conectar ao seu workspace usando o token de acesso pessoal que você criou acima:

  1. Em uma janela do navegador, abra http://localhost:8080/connection/list/. Se solicitado a entrar, insira o nome de usuário e a senha do administrador.
  2. Em ID da Conexão, localize databricks_default e clique no botão Editar registro.
  3. Substitua o valor no campo Host pelo nome da instância do workspace da sua implantação do Azure Databricks, por exemplo, https://adb-123456789.cloud.databricks.com.
  4. No campo Senha, insira o token de acesso pessoal do Azure Databricks.
  5. Clique em Save (Salvar).

Se você estiver usando um token do Microsoft Entra ID, consulte a seção Conexão do Databricks na documentação do Airflow para obter informações sobre como configurar a autenticação.

Exemplo: criar um DAG do Airflow para executar um trabalho do Azure Databricks

O exemplo a seguir demonstra como criar uma implantação simples do Airflow que é executada no computador local e implanta um DAG de exemplo para disparar as execuções no Azure Databricks. Neste exemplo, você vai:

  1. Criar um notebook e adicionar um código para imprimir uma saudação de acordo com um parâmetro configurado.
  2. Criar um trabalho do Azure Databricks com uma só tarefa que executa o notebook.
  3. Configurar uma conexão do Airflow com seu workspace do Azure Databricks.
  4. Criar um DAG do Airflow para disparar o trabalho do notebook. O DAG é definido em um script Python por meio de DatabricksRunNowOperator.
  5. Usar a interface do usuário do Airflow para disparar o DAG e ver o status de execução.

Criar um notebook

Este exemplo usa um notebook que contém duas células:

  • A primeira célula contém um widget de texto dos Utilitários do Databricks definindo uma variável chamada greeting definida como o valor padrão world.
  • A segunda célula imprime o valor da variável greeting prefixada por hello.

Para criar o notebook:

  1. Vá para o workspace do Azure Databricks, clique em Novo ícone Novo na barra lateral e selecione Notebook.

  2. Dê um nome ao notebook, como Hello Airflow, e verifique se a linguagem padrão está definida como Python.

  3. Copie o código Python a seguir e cole-o na primeira célula do notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Adicione uma nova célula abaixo da primeira célula e copie e cole o seguinte código Python na nova célula:

    print("hello {}".format(greeting))
    

Criar um trabalho

  1. Clique no ícone TrabalhosFluxos de Trabalho na barra lateral.

  2. Clique no Botão Criar Trabalho.

    A guia Tarefas aparece com a caixa de diálogo de criação de tarefa.

    Criar caixa de diálogo da primeira tarefa

  3. Substitua Adicionar um nome ao trabalho… pelo nome do trabalho.

  4. No campo Nome da tarefa, insira um nome para a tarefa, por exemplo, greeting-task.

  5. No menu suspenso Tipo, selecione Notebook.

  6. No menu suspenso Origem, selecione Workspace.

  7. Clique na caixa de texto Caminho e use o pesquisador de arquivos para encontrar o notebook que você criou, clique no nome do navegador e clique em Confirmar.

  8. Clique em Adicionar em Parâmetros. No campo Chave, insira greeting. No campo Valor, insira Airflow user.

  9. Clique em Criar tarefa.

No painel Detalhes do trabalho, copie o valor da ID do trabalho. Esse valor é necessário para disparar o trabalho do Airflow.

Executar o trabalho

Para testar seu novo trabalho na interface do usuário dos Fluxos de Trabalho do Azure Databricks, clique em botão Executar agora no canto superior direito. Quando a execução for concluída, você poderá verificar a saída exibindo os detalhes da execução do trabalho.

Criar um novo DAG do Airflow

Um DAG do Airflow é definido em um arquivo Python. Para criar um DAG e disparar o exemplo de trabalho de notebook:

  1. Em um editor de texto ou um IDE, crie um arquivo chamado databricks_dag.py com o seguinte conteúdo:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Substitua JOB_ID pelo valor da ID do trabalho salvo anteriormente.

  2. Salve o arquivo no diretório airflow/dags. O Airflow lê e instala automaticamente os arquivos DAG armazenados em airflow/dags/.

Instalar e verificar o DAG no Airflow

Para disparar e verificar o DAG na interface do usuário do Airflow:

  1. Em uma janela do navegador, abra http://localhost:8080/home. A tela DAGs do Airflow será exibida.
  2. Localize databricks_dag e clique na alternância Pausar/Retomar DAG para retomar o DAG.
  3. Dispare o DAG clicando no botãoDisparar DAG.
  4. Clique em uma execução na coluna Execuções para ver o status e os detalhes da execução.