I have similar use case and searched across google for solution and haven't found perfect working solution so far. So developed something on my own.
import requests
import time
import logging
import sys
from datetime import datetime
try:
import adal
except ImportError:
import pip
pip.main(['install', 'adal'])
import adal
def getAuthToken(tenantId, clientId, key, scope):
authorityUrl = 'https://login.microsoftonline.com/'+tenantId
context = adal.AuthenticationContext(authorityUrl)
token = context.acquire_token_with_client_credentials(resource='https://management.azure.com/'
,client_id=clientId
,client_secret=dbutils.secrets.get(scope=scope, key=key)
)
return token["accessToken"]
def executePipeline(accessToken, subscriptionId, resourceGroup, dataFactory, pipelineName):
headers = {'Authorization': f'Bearer {accessToken}'}
url = f'https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroup}/providers/Microsoft.DataFactory/factories/{dataFactory}/pipelines/{pipelineName}/createRun?api-version=2018-06-01'
response = requests.post(url, headers=headers)
if '200' in str(response):
status = 'success'
runId = response.json().get('runId')
return_value = {status:status, runId:runId}
return runId
def getPipelineStatus(accessToken, subscriptionId, resourceGroup, dataFactory, pipelineRunId):
headers = {'Authorization': f'Bearer {accessToken}'}
url = f'https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroup}/providers/Microsoft.DataFactory/factories/{dataFactory}/pipelineruns/{pipelineRunId}?api-version=2018-06-01'
response = requests.get(url, headers=headers)
return response.json().get('status')
def main():
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.info(str(datetime.now())+f': ExecuteADFPipeline Notebook started')
dbutils.widgets.text("tenantId", "")
tenantId = dbutils.widgets.get("tenantId")
dbutils.widgets.text("clientId", "")
clientId = dbutils.widgets.get("clientId")
dbutils.widgets.text("key", "")
key = dbutils.widgets.get("key")
dbutils.widgets.text("scope", "")
scope = dbutils.widgets.get("scope")
dbutils.widgets.text("subscriptionId", "")
subscriptionId = dbutils.widgets.get("subscriptionId")
dbutils.widgets.text("resourceGroup", "")
resourceGroup = dbutils.widgets.get("resourceGroup")
dbutils.widgets.text("dataFactory", "")
dataFactory = dbutils.widgets.get("dataFactory")
dbutils.widgets.text("pipelineName", "")
pipelineName = dbutils.widgets.get("pipelineName")
logger.info(str(datetime.now())+f': widgets are declared and assigned to variables')
accessToken = getAuthToken(tenantId, clientId, key, scope)
logger.info(str(datetime.now())+f': getAuthToken is completed')
pipelineRunId = executePipeline(accessToken, subscriptionId, resourceGroup, dataFactory, pipelineName)
logger.info(str(datetime.now())+f': pipeline execution is completed')
reCheck = True
while reCheck == True:
time.sleep(120)
pipelineStatus = getPipelineStatus(accessToken, subscriptionId, resourceGroup, dataFactory, pipelineRunId)
logger.info(str(datetime.now())+f': pipeline status is {pipelineStatus}')
if pipelineStatus in ['Cancelled', 'Failed']:
reCheck = False
logger.info(str(datetime.now())+f': Notebook is exiting because it is {pipelineStatus}')
sys.exit(1)
if pipelineStatus in ['Succeeded']:
reCheck = False
logger.info(str(datetime.now())+f': Notebook is completed successfully')
return pipelineStatus
pipelineStatus = main()