Execute um pipeline existente com o gerenciador de orquestração de fluxo de trabalho

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Dica

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!

Observação

O gerenciador de orquestração de fluxo de trabalho é desenvolvido com Apache Airflow.

Observação

O gerenciador de orquestração de fluxo de trabalho para Azure Data Factory depende do aplicativo Apache Airflow de código aberto. Encontre a documentação e mais tutoriais do Airflow nas páginas Documentação ou Comunidade do Apache Airflow.

Os pipelines do Data Factory fornecem mais de 100 conectores de fonte de dados que fornecem integração de dados/fluxos de dados escalonáveis e confiáveis. Há cenários em que você gostaria de executar um pipeline do data factory existente do DAG do Apache Airflow. Este tutorial mostra exatamente como fazer isso.

Pré-requisitos

  • Assinatura do Azure. Caso você não tenha uma assinatura do Azure, crie uma conta gratuita do Azure antes de começar.
  • Conta de Armazenamento do Azure. Se você não tiver uma conta de armazenamento, confira Criar uma conta de armazenamento do Azure a fim de conhecer as etapas para criar uma. Verifique se a conta de armazenamento permite acesso somente de redes selecionadas.
  • Pipeline do Azure Data Factory. Você pode seguir qualquer um dos tutoriais e criar um novo pipeline do data factory, se ainda não tiver um, ou criar um selecionando Começar e experimentar o primeiro pipeline do data factory.
  • Configurar uma Entidade de Serviço. Você precisará criar uma nova entidade de serviço ou usar uma existente e conceder-lhe permissão para executar o pipeline (exemplo - função de contribuidor no data factory onde existem os pipelines existentes), mesmo que o ambiente do gerenciador de orquestração de fluxo de trabalho e o existem pipelines no mesmo data factory. Será necessário obter a ID do Cliente e o Segredo do Cliente (chave de API) da Entidade de Serviço.

Etapas

  1. Crie um novo arquivo do Python adf.py com o conteúdo abaixo:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Você terá que criar a conexão usando o administrador da interface do usuário do gerenciador de orquestração de fluxo de trabalho -> Conexões -> “+” -> Escolha “Tipo de conexão” como “Azure Data Factory” e preencha seu client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name, e pipeline_name.

  2. Carregue o arquivo adf.py no armazenamento de blobs em uma pasta nomeada DAGS.

  3. Importe a pasta DAGS para o ambiente do gerenciador de orquestração de fluxo de trabalho. Se não tiver um, crie um novo

    Captura de tela mostrando a guia de gerenciamento do data factory com a seção do Airflow selecionada.