Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
APLICA-SE A:
Azure Data Factory
Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Importante
A partir de 1 de janeiro de 2026, já não poderá criar novas instâncias Airflow usando o Workflow Orchestration Manager do ADF. Recomendamos que migre todas as cargas de trabalho do Workflow Orchestration Manager (Apache Airflow no Azure Data Factory) para trabalhos Apache Airflow no Microsoft Fabric antes de 31 de dezembro de 2025.
Para mais informações ou apoio durante a sua migração para o Apache Airflow no Microsoft Fabric, contacte o Suporte da Microsoft.
Os pipelines do Data Factory fornecem 100+ conectores de fonte de dados que fornecem integração de dados/fluxos de dados escaláveis e confiáveis. Há cenários em que você gostaria de executar um pipeline de fábrica de dados existente a partir do seu Apache Airflow DAG. Este tutorial mostra-te como.
Pré-requisitos
- Subscrição do Azure. Se não tiver uma subscrição do Azure, crie uma conta do Azure gratuita antes de começar.
- Conta de armazenamento do Azure. Se não tiver uma conta de armazenamento, veja Criar uma conta de armazenamento do Azure para seguir os passos para criar uma. Certifique-se de que a conta de armazenamento permite o acesso apenas a partir de redes selecionadas.
- Pipeline do Azure Data Factory. Você pode seguir qualquer um dos tutoriais e criar um novo pipeline de data factory caso ainda não tenha um, ou criar um com uma seleção em Introdução e experimentar seu primeiro pipeline de data factory.
- Configure uma entidade de serviço. Você precisará criar uma nova entidade de serviço ou usar uma existente e conceder-lhe permissão para executar o pipeline (exemplo - função de colaborador na fábrica de dados onde os pipelines existentes existem), mesmo que o ambiente do Workflow Orchestration Manager e os pipelines existam no mesmo data factory. Você precisará obter a ID do Cliente e o Segredo do Cliente (Chave de API) da Entidade de Serviço.
Passos
Crie um novo arquivo Python adf.py com o conteúdo abaixo:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensorVocê terá que criar a conexão usando o Workflow Orchestration Manager UI Admin -> Connections -> '+' -> Escolha 'Tipo de conexão' como 'Azure Data Factory' e, em seguida, preencha sua client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name e pipeline_name.
Carregue o arquivo adf.py para seu armazenamento de blob dentro de uma pasta chamada DAGS.
Importe a pasta DAGS para o ambiente do Workflow Orchestration Manager. Se você não tiver um, crie um novo