DAG runs stuck in running state using Azure Data Factory Managed Airflow (version 2.4.3)

Pete Trenkwalder 5 Reputation points
2023-02-28T03:13:18.7466667+00:00

I'm following Microsoft's tutorial on how does managed airflow work using the tutorial.py script referenced in the documentation (see code block below). I've set up my airflow environment in azure data factory using the same configuration in the documentation with the exception of the airflow version - I'm using version 2.4.3 as version 2.2.2 is no longer available in data factory.

Everything appears to be set up successfully. However, my DAG runs never succeed nor fail; they just stay in the running state:dag state

I've tested this locally and the runs succeed so I'm wondering if there's any additional configuration required for azure data factory managed airflow that isn't documented in the tutorial link referenced above? Are there certain Airflow requirements or Airflow configuration overrides that need to be set when using azure data factory's managed airflow service?

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(minutes=5),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes 
    
    rendered in the UI's Task Instance Details page.
    
    
    """
    )

    dag.doc_md = 
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
9,532 questions
{count} vote

1 answer

Sort by: Most helpful
  1. HimanshuSinha-msft 19,376 Reputation points Microsoft Employee
    2023-03-30T17:54:12.89+00:00

    Hello @Pete Trenkwalder , Thanks for the question and using MS Q&A platform.
    When you say

    using the same configuration in the documentation with the exception of the airflow version - I'm using version 2.4.3 as version 2.2.2 is no longer available in data factory.

    Please let me know how you are doing that as I only see the below version in the ADF UI .

    User's image

    Himanshu

    0 comments No comments