How do we run Azure Data Factory PIPELINE from Azure Databricks notebook using Python or Scala

Mirza Azad Beg 6 Reputation points
2021-05-28T10:54:32.827+00:00

Hi Team,
Good Morning..

I have a requirement to call Azure Data Factory's published pipeline from Azure Databricks notebook using Python or Scala code, I was trying the same but could not find the way that how to call it from Azure Databricks(ADB) notebook?
So can you please help me how to achieve this requirement.

Thank you.

Regards,
Mirza Azad Beg

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,150 questions
0 comments No comments
{count} votes

3 answers

Sort by: Most helpful
  1. PRADEEPCHEEKATLA-MSFT 88,471 Reputation points Microsoft Employee
    2021-05-28T12:48:33.51+00:00

    Hello anonymous user,

    Welcome to the Microsoft Q&A platform.

    Unfortunately, you cannot run ADF pipelines from Azure Databricks notebook using Python or Scala language.

    The other way is possible, running Azure Databricks notebooks using ADF pipeline using Notebook/Python/Jar activities.

    I would suggest you to provide feedback on the same:

    https://feedback.azure.com/forums/270578-data-factory

    https://feedback.azure.com/forums/909463-azure-databricks

    All of the feedback you share in these forums will be monitored and reviewed by the Microsoft engineering teams responsible for building Azure.

    Hope this helps. Do let us know if you any further queries.

    ---------------------------------------------------------------------------

    Please "Accept the answer" if the information helped you. This will help us and others in the community as well.

    0 comments No comments

  2. MarkA 1 Reputation point
    2022-07-21T15:43:52.637+00:00
    0 comments No comments

  3. Kranti Kumar 0 Reputation points
    2024-08-22T07:02:30.9133333+00:00

    I have similar use case and searched across google for solution and haven't found perfect working solution so far. So developed something on my own.

    import requests
    import time
    import logging
    import sys
    from datetime import datetime
    try:
        import adal
    except ImportError:
        import pip
        pip.main(['install', 'adal'])
        import adal
    
    def getAuthToken(tenantId, clientId, key, scope):
        authorityUrl = 'https://login.microsoftonline.com/'+tenantId
        context = adal.AuthenticationContext(authorityUrl)
        token = context.acquire_token_with_client_credentials(resource='https://management.azure.com/'
                                                          ,client_id=clientId
                                                          ,client_secret=dbutils.secrets.get(scope=scope, key=key)
                                                          )
        return token["accessToken"]
    
    def executePipeline(accessToken, subscriptionId, resourceGroup, dataFactory, pipelineName):
        headers = {'Authorization': f'Bearer {accessToken}'}
        url = f'https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroup}/providers/Microsoft.DataFactory/factories/{dataFactory}/pipelines/{pipelineName}/createRun?api-version=2018-06-01'
        response = requests.post(url, headers=headers)
        if '200' in str(response):
            status = 'success'
            runId = response.json().get('runId')
            return_value = {status:status, runId:runId}
        return runId
    
    def getPipelineStatus(accessToken, subscriptionId, resourceGroup, dataFactory, pipelineRunId):
        headers = {'Authorization': f'Bearer {accessToken}'}
        url = f'https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroup}/providers/Microsoft.DataFactory/factories/{dataFactory}/pipelineruns/{pipelineRunId}?api-version=2018-06-01'
        response = requests.get(url, headers=headers)
        return response.json().get('status')
    
    def main():
        logging.basicConfig()
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.DEBUG)
        logger.info(str(datetime.now())+f': ExecuteADFPipeline Notebook started')
        dbutils.widgets.text("tenantId", "")
        tenantId = dbutils.widgets.get("tenantId")
    
        dbutils.widgets.text("clientId", "")
        clientId = dbutils.widgets.get("clientId")
    
        dbutils.widgets.text("key", "")
        key = dbutils.widgets.get("key")
    
        dbutils.widgets.text("scope", "")
        scope = dbutils.widgets.get("scope")
    
        dbutils.widgets.text("subscriptionId", "")
        subscriptionId = dbutils.widgets.get("subscriptionId")
    
        dbutils.widgets.text("resourceGroup", "")
        resourceGroup = dbutils.widgets.get("resourceGroup")
    
        dbutils.widgets.text("dataFactory", "")
        dataFactory = dbutils.widgets.get("dataFactory")
    
        dbutils.widgets.text("pipelineName", "")
        pipelineName = dbutils.widgets.get("pipelineName")
        logger.info(str(datetime.now())+f': widgets are declared and assigned to variables')
        accessToken = getAuthToken(tenantId, clientId, key, scope)
        logger.info(str(datetime.now())+f': getAuthToken is completed')
        pipelineRunId = executePipeline(accessToken, subscriptionId, resourceGroup, dataFactory, pipelineName)
        logger.info(str(datetime.now())+f': pipeline execution is completed')
        reCheck = True
        while reCheck == True:
            time.sleep(120)
            pipelineStatus = getPipelineStatus(accessToken, subscriptionId, resourceGroup, dataFactory, pipelineRunId)
            logger.info(str(datetime.now())+f': pipeline status is {pipelineStatus}')
            if pipelineStatus in ['Cancelled', 'Failed']:
                reCheck = False
                logger.info(str(datetime.now())+f': Notebook is exiting because it is {pipelineStatus}')
                sys.exit(1)
            if pipelineStatus in ['Succeeded']:
                reCheck = False
                logger.info(str(datetime.now())+f': Notebook is completed successfully')
        return pipelineStatus
    
    pipelineStatus = main()    
    
    
    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.