@Amira Bedhiafi i have tried as you advised. however, i am getting the following import error. Also, please find the code below.
"ImportError: cannot import name 'DataFlowActivity' from 'azure.mgmt.datafactory.models' "
I am trying to build an pipeline by using Python SDK. this code will dynamically create new pipeline based on the tollgateinfo_table in SQL(which holds, source,destination, metadata and other informations.)
the dynamic pipeline as followes.
- Data Flow (which will extract the tollgateinfo from DB, and supplies the required config details to Copy Activity
- Copy Activity copy connect source DB and extract the data and land it into data lake
The goal is, any business user can create tollgate info from UI app, -> it will than insert into SQL tollgateinfo table. we also have an another UI for GIT, where business user commit the code to dev branch.
and, i will have an devops pipeline, which will pick the new file and use the exisitng pythin code and generate new ADF pipeline. and once the ADF pipeline is created , business user will get notifed and he use another UI where he can schedule and run. this over all moto.
My code( thanks @Amira Bedhiafi ).
# This is a sample Python script.from azure.identity import ClientSecretCredentialfrom azure.mgmt.resource import ResourceManagementClientfrom azure.mgmt.datafactory import DataFactoryManagementClientfrom azure.mgmt.datafactory.models import *from datetime import datetime, timedeltaimport time# Press Shift+F10 to execute it or replace it with your code.# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.from azure.mgmt.datafactory.models import PipelineResource, CopyActivity, DataFlowActivity, ActivityDependencydef print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties)def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n")def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))if __name__ == '__main__': # Azure subscription ID subscription_id = 'XXX' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = 'rg-advancedanalytics-dev-aljm-uaen' # The data factory name. It must be globally unique. df_name = 'adf-code-driven-demo' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='', client_secret='', tenant_id='') # Specify following for Soverign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'uaenorth'} df_params = {'location':'uaenorth'} resource_client.resource_groups.create_or_update(rg_name, rg_params) df_resource = Factory(location='uaenorth') df = adf_client.factories.create_or_update(rg_name, df_name, df_resource) if df_name == 'adf-code-driven-demo': print('Already Available', df) else: while df.provisioning_state != 'Succeeded': df = adf_client.factories.get(rg_name, df_name) time.sleep(1) # Create an Azure Storage linked service ls_name = 'ls_adf_code_driven_demo' # IMPORTANT: specify the name and key of your Azure Storage account. storage_string = SecureString(value='') ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(type='LinkedServiceResource', connection_string=storage_string)) ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage) print_item(ls) # Create an Azure blob dataset (input) ds_name = 'ds_adf_code_driven_demo_in' ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name) blob_path = 'adfcodedrivendemo/adf_code_driven_demo' blob_filename = 'adf_demo_input_date.xlsx' ds_azure_blob = DatasetResource(properties=AzureBlobDataset( linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) ds = adf_client.datasets.create_or_update( rg_name, df_name, ds_name, ds_azure_blob) print_item(ds) # Create an Azure blob dataset (output) dsOut_name = 'ds_adf_code_driven_demo_out' output_blobpath = 'adfcodedrivendemo/adf_code_driven_demo/output' dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)) dsOut = adf_client.datasets.create_or_update( rg_name, df_name, dsOut_name, dsOut_azure_blob) print_item(dsOut) # Create a copy activity act_name = 'adf_code_driven_copy_activity' blob_source = BlobSource() blob_sink = BlobSink() dsin_ref = DatasetReference(type='DatasetReference', reference_name=ds_name) dsOut_ref = DatasetReference(type='DatasetReference',reference_name=dsOut_name) copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink,) # Create dataflow response = adf_client.datasets.create_or_update( resource_group_name=rg_name, factory_name=df_name, dataset_name="ds_out_Blob", dataset={ "properties": { "linkedServiceName": {"referenceName": "ls_adf_code_driven_demo", "type": "LinkedServiceReference"}, "type": "DelimitedText", "typeProperties": { "location": { "type": "AzureBlobStorageLocation", "fileName": "adf_demo_input_date.xlsx", "folderPath": "output", "container": "adfcodedrivendemo/adf_code_driven_demo" }, "columnDelimiter": ",", "escapeChar": "\\", "firstRowAsHeader": "true", "quoteChar": "\"" }, "schema": [] } }, ) response = adf_client.datasets.create_or_update( resource_group_name=rg_name, factory_name=df_name, dataset_name="ds_in_Blob", dataset={ "properties": { "linkedServiceName": {"referenceName": "ls_adf_code_driven_demo", "type": "LinkedServiceReference"}, "type": "DelimitedText", "typeProperties": { "location": { "type": "AzureBlobStorageLocation", "fileName": "adf_demo_input_date.xlsx", "container": "adfcodedrivendemo/adf_code_driven_demo" }, "columnDelimiter": ",", "escapeChar": "\\", "quoteChar": "\"" }, "schema": [ { "type": "String" }, { "type": "String" }, { "type": "String" }, { "type": "String" }, { "type": "String" }, { "type": "String" } ] } }, ) response = adf_client.data_flows.create_or_update( resource_group_name=rg_name, factory_name=df_name, data_flow_name="adf_code_driven_dataflow", data_flow={ "properties": { "description": "Sample demo data flow to convert currencies showing usage of union, derive and conditional split transformation.", "type": "MappingDataFlow", "typeProperties": { "scriptLines": [ "source(allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: false) ~> source1", "source1 sink(allowSchemaDrift: true,", " validateSchema: false,", " skipDuplicateMapInputs: true,", " skipDuplicateMapOutputs: true) ~> sink1" ], "sinks": [ { "dataset": { "referenceName": "ds_adf_code_driven_demo_out", "type": "DatasetReference" }, "name": "sink1" } ], "sources": [ { "dataset": { "referenceName": "ds_adf_code_driven_demo_out", "type": "DatasetReference" }, "name": "source1" }, ], }, } }, ) print(response) # Define the DataFlow Activity dataflow_activity = DataFlowActivity( name="run_adf_code_driven_dataflow_activity", data_flow_name="adf_code_driven_dataflow", # ... Other necessary configurations ) # Define the Copy Activity copy_activity = CopyActivity( name="adf_code_driven_copy_activity", #inputs=[...], # This should take input from the DataFlow's output dataset #outputs=[...], # ... Other necessary configurations ) # Define Dependency (Ensure Copy Activity runs after DataFlow Activity) copy_activity.depends_on = [ActivityDependency(activity="run_adf_code_driven_dataflow_activity")] # Create the Pipeline with DataFlow and Copy Activity linked pipeline = PipelineResource( activities=[dataflow_activity, copy_activity], description="Your pipeline description" ) # Create or update the pipeline in Azure Data Factory adf_client.pipelines.create_or_update(rg_name, df_name, "adf_code_driven_copypipeline", pipeline) # Run the pipeline