Executar um pipeline existente com o Workflow Orchestration Manager
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Nota
O Workflow Orchestration Manager é alimentado pelo Apache Airflow.
Nota
O Workflow Orchestration Manager para Azure Data Factory depende do aplicativo Apache Airflow de código aberto. Documentação e mais tutoriais sobre o fluxo de ar podem ser encontrados nas páginas de documentação ou comunidade do Apache Airflow.
Os pipelines do Data Factory fornecem 100+ conectores de fonte de dados que fornecem integração de dados/fluxos de dados escaláveis e confiáveis. Há cenários em que você gostaria de executar um pipeline de fábrica de dados existente a partir do seu Apache Airflow DAG. Este tutorial mostra como fazer exatamente isso.
Pré-requisitos
- Subscrição do Azure. Se não tiver uma subscrição do Azure, crie uma conta do Azure gratuita antes de começar.
- Conta de armazenamento do Azure. Se não tiver uma conta de armazenamento, veja Criar uma conta de armazenamento do Azure para seguir os passos para criar uma. Certifique-se de que a conta de armazenamento permite o acesso apenas a partir de redes selecionadas.
- Pipeline do Azure Data Factory. Você pode seguir qualquer um dos tutoriais e criar um novo pipeline de data factory caso ainda não tenha um, ou criar um com uma seleção em Introdução e experimentar seu primeiro pipeline de data factory.
- Configure uma entidade de serviço. Você precisará criar uma nova entidade de serviço ou usar uma existente e conceder-lhe permissão para executar o pipeline (exemplo - função de colaborador na fábrica de dados onde os pipelines existentes existem), mesmo que o ambiente do Workflow Orchestration Manager e os pipelines existam no mesmo data factory. Você precisará obter a ID do Cliente e o Segredo do Cliente (Chave de API) da Entidade de Serviço.
Passos
Crie um novo arquivo Python adf.py com o conteúdo abaixo:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
Você terá que criar a conexão usando o Workflow Orchestration Manager UI Admin -> Connections -> '+' -> Escolha 'Tipo de conexão' como 'Azure Data Factory' e, em seguida, preencha sua client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name e pipeline_name.
Carregue o arquivo adf.py para seu armazenamento de blob dentro de uma pasta chamada DAGS.
Importe a pasta DAGS para o ambiente do Workflow Orchestration Manager. Se você não tiver um, crie um novo