How to create data flow using Azure SDK for python?

Subashri Vasudevan 11,226 Reputation points
2023-06-03T16:28:48.69+00:00

Hi Team,

I am trying to create a simple data flow using azure sdk, with a source and sink, but unable to create link between the two. can you please provide the code snippet to do the same? There are hardly any resource for it,

thanks in advance.

Suba

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
11,623 questions
{count} votes

Accepted answer
  1. QuantumCache 20,366 Reputation points Moderator
    2023-06-06T14:34:29.1433333+00:00

    Hello @Subashri Vasudevan,
    Thanks for reaching out on this forum. I am checking on this query!
    Did you try the following documentation?

    Quickstart: Create a data factory and pipeline using Python

    from azure.identity import ClientSecretCredential
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
    def main():
        # Azure subscription ID
        subscription_id = '5917e'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = 'MyRG'
    
        # The data factory name. It must be globally unique.
        df_name = 'msftCommunityspace'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='1fba2e52', client_secret='j1R8WcrD', tenant_id='cd5218') 
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location': 'eastus'}
        df_params = {'location': 'eastus'}
    
        # create the resource group
        # comment out if the resource group already exists
        # resource_client.resource_groups.create_or_update(rg_name, rg_params)
    
        # Create an Azure Storage linked service
        ls_name = 'ls_storageLinkedService823'
    
        # IMPORTANT: Specify the name and key of your Azure Storage account.
        storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=communiestorage;AccountKey=O1fk==;EndpointSuffix=core.windows.net')
    
        ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
        ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
        print_item(ls)
    
    
        
        response = adf_client.datasets.create_or_update(
            resource_group_name=rg_name,
            factory_name=df_name,
            dataset_name="ds_out_Blob",
            dataset={
                "properties": {
                    "linkedServiceName": {"referenceName": "ls_storageLinkedService823", "type": "LinkedServiceReference"},
                    "type": "DelimitedText",
                    "typeProperties": {
                            "location": {
                            "type": "AzureBlobStorageLocation",
                            "fileName": "SalesheaderOut.csv",
                            "folderPath": "output",
                            "container": "adfcont"
                                        },
                            "columnDelimiter": ",",
                            "escapeChar": "\\",
                            "firstRowAsHeader": "true",
                            "quoteChar": "\""
            },
            "schema": []
                }
            },
        )
    
    
    
        response = adf_client.datasets.create_or_update(
            resource_group_name=rg_name,
            factory_name=df_name,
            dataset_name="ds_in_Blob",
            dataset={
                "properties": {
                    "linkedServiceName": {"referenceName": "ls_storageLinkedService823", "type": "LinkedServiceReference"},
                    "type": "DelimitedText",
                    "typeProperties": {
                    "location": {
                    "type": "AzureBlobStorageLocation",
                    "fileName": "Salesheader.csv",
                    "container": "inbound"
                },
                "columnDelimiter": ",",
                "escapeChar": "\\",
                "quoteChar": "\""
            },
                    "schema": [
                {
                    "type": "String"
                },
                {
                    "type": "String"
                },
                {
                    "type": "String"
                },
                {
                    "type": "String"
                },
                {
                    "type": "String"
                },
                {
                    "type": "String"
                }
                    ]
                }
            },
        )
    
        response = adf_client.data_flows.create_or_update(
            resource_group_name=rg_name,
            factory_name=df_name,
            data_flow_name="exampleDataFlow1",
            data_flow={
                "properties": {
                    "description": "Sample demo data flow to convert currencies showing usage of union, derive and conditional split transformation.",
                    "type": "MappingDataFlow",
                    "typeProperties": {
                        "scriptLines": [
                                            "source(allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     ignoreNoFilesFound: false) ~> source1",
                    "source1 sink(allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> sink1"
                        ],
                        "sinks": [
                            {
                            "dataset": {
                            "referenceName": "ds_out_Blob",
                            "type": "DatasetReference"
                        },
                            "name": "sink1"
                            }
                        ],
                        "sources": [
                            {
                            "dataset": {
                            "referenceName": "ds_in_Blob",
                            "type": "DatasetReference"
                        },
                            "name": "source1"
                            },
                        ],
                    },
                }
            },
        )
        print(response)
    
    
    # Start the main method
    main()
    
    
    1 person found this answer helpful.

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.