Hello @Subashri Vasudevan,
Thanks for reaching out on this forum. I am checking on this query!
Did you try the following documentation?
Quickstart: Create a data factory and pipeline using Python
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '5917e'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = 'MyRG'
# The data factory name. It must be globally unique.
df_name = 'msftCommunityspace'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='1fba2e52', client_secret='j1R8WcrD', tenant_id='cd5218')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location': 'eastus'}
df_params = {'location': 'eastus'}
# create the resource group
# comment out if the resource group already exists
# resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create an Azure Storage linked service
ls_name = 'ls_storageLinkedService823'
# IMPORTANT: Specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=communiestorage;AccountKey=O1fk==;EndpointSuffix=core.windows.net')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
response = adf_client.datasets.create_or_update(
resource_group_name=rg_name,
factory_name=df_name,
dataset_name="ds_out_Blob",
dataset={
"properties": {
"linkedServiceName": {"referenceName": "ls_storageLinkedService823", "type": "LinkedServiceReference"},
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"fileName": "SalesheaderOut.csv",
"folderPath": "output",
"container": "adfcont"
},
"columnDelimiter": ",",
"escapeChar": "\\",
"firstRowAsHeader": "true",
"quoteChar": "\""
},
"schema": []
}
},
)
response = adf_client.datasets.create_or_update(
resource_group_name=rg_name,
factory_name=df_name,
dataset_name="ds_in_Blob",
dataset={
"properties": {
"linkedServiceName": {"referenceName": "ls_storageLinkedService823", "type": "LinkedServiceReference"},
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"fileName": "Salesheader.csv",
"container": "inbound"
},
"columnDelimiter": ",",
"escapeChar": "\\",
"quoteChar": "\""
},
"schema": [
{
"type": "String"
},
{
"type": "String"
},
{
"type": "String"
},
{
"type": "String"
},
{
"type": "String"
},
{
"type": "String"
}
]
}
},
)
response = adf_client.data_flows.create_or_update(
resource_group_name=rg_name,
factory_name=df_name,
data_flow_name="exampleDataFlow1",
data_flow={
"properties": {
"description": "Sample demo data flow to convert currencies showing usage of union, derive and conditional split transformation.",
"type": "MappingDataFlow",
"typeProperties": {
"scriptLines": [
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> sink1"
],
"sinks": [
{
"dataset": {
"referenceName": "ds_out_Blob",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"sources": [
{
"dataset": {
"referenceName": "ds_in_Blob",
"type": "DatasetReference"
},
"name": "source1"
},
],
},
}
},
)
print(response)
# Start the main method
main()