Megosztás a következőn keresztül:


Rövid útmutató: Adatgyár és folyamatvonal létrehozása Python segítségével

Vonatkozik: Azure Data Factory Azure Synapse Analytics

Tip

Data Factory a Microsoft Fabric a Azure Data Factory következő generációja, egyszerűbb architektúrával, beépített AI-vel és új funkciókkal. Ha még nem ismerkedik az adatintegrációval, kezdje a Fabric Data Factoryvel. A meglévő ADF-számítási feladatok frissíthetők Fabric használatával, hogy elérjék az adatkutatás, a valós idejű elemzés és a jelentéskészítés új képességeit.

Ebben a rövid útmutatóban egy adatgyárat hoz létre Python használatával. Ebben az adat-előállítóban a folyamat adatokat másol az egyik mappából a Azure Blob Storage egy másik mappájába.

Azure Data Factory egy felhőalapú adatintegrációs szolgáltatás, amely lehetővé teszi adatvezérelt munkafolyamatok létrehozását az adatáthelyezés és adatátalakítás vezényléséhez és automatizálásához. A Azure Data Factory használatával adatvezérelt munkafolyamatokat, úgynevezett folyamatokat hozhat létre és ütemezhet.

Az adatfeldolgozási csatornák különböző adattárakból képesek adatokat fogadni. A folyamatok olyan számítási szolgáltatások használatával dolgozzák fel vagy alakítják át az adatokat, mint például Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics és Azure Machine Learning. A csővezetékek kimeneti adatokat tárolnak adattárakban, mint az Azure Synapse Analytics, üzletiintelligencia-alkalmazásokhoz.

Előfeltételek

  • Aktív előfizetéssel rendelkező Azure fiók. Hozzon létre egyet ingyen.

  • Python 3,6+.

  • A Azure Storage fiók.

  • Azure Storage Explorer (nem kötelező).

  • Egy alkalmazás a Microsoft Entra ID-ben. Hozza létre az alkalmazást a hivatkozás lépéseinek követésével a 2. hitelesítési lehetőség (alkalmazáskulcs) használatával, és rendelje hozzá az alkalmazást a közreműködői szerepkörhöz az ugyanabban a cikkben található utasításokat követve. Jegyezze fel a következő értékeket a cikkben látható módon, amelyet a későbbi lépésekben használhat: alkalmazás-(ügyfél-) azonosító, titkos ügyfélkód és bérlőazonosító.

Bemeneti fájl létrehozása és feltöltése

  1. Indítsa el a Jegyzettömbet. Másolja be az alábbi szöveget, és mentse egy input.txt nevű fájlként a lemezen.

    John|Doe
    Jane|Doe
    
  2. A adfv2tutorial tároló és input mappa létrehozásához használja az olyan eszközöket, mint a Azure Storage Explorer. Ezután töltse fel az input.txt fájlt az input mappába.

A Python csomag telepítése

  1. Nyisson meg egy terminált vagy parancssort rendszergazdai jogosultságokkal. 

  2. Először telepítse a Python csomagot Azure felügyeleti erőforrásokhoz:

    pip install azure-mgmt-resource
    
  3. A Data Factory Python csomagjának telepítéséhez futtassa a következő parancsot:

    pip install azure-mgmt-datafactory
    

    A Data Factory Python SDK támogatja a 2.7-es és a 3.6-os Python.

  4. Az Azure identitáshitelesítéshez szükséges Python csomag telepítéséhez futtassa a következő parancsot:

    pip install azure-identity
    

    Note

    Az "azure-identity" csomag egyes gyakori függőségek esetében ütközhet az "azure-cli"-vel. Ha bármilyen hitelesítési problémát tapasztal, távolítsa el az "azure-cli" és annak függőségeit, vagy használjon tiszta gépet az "azure-cli" csomag telepítése nélkül, hogy működjön. Szuverén felhők esetén a megfelelő felhőspecifikus állandókat kell használnia. Kérjük, tekintse meg a Csatlakozás minden régióhoz Azure könyvtárak használatával Python többfelhős környezetekben | a Microsoft Docs a szuverén felhőkön való Python kapcsolódásra vonatkozó utasításaiért.

Adat-előállító ügyfél létrehozása

  1. Hozzon létre egy datafactory.py nevű fájlt. Adja hozzá az alábbi utasításokat, hogy a névterekre mutató hivatkozásokat tudjon felvenni.

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. Adja hozzá az alábbi függvényeket az adatok megjelenítéséhez.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. Adja hozzá a következő kódot a Main metódushoz, amely létrehoz egy DataPipelineManagementClient osztályú példányt. Ezzel az objektummal adat-előállítót, társított szolgáltatást, adatkészleteket és folyamatot hozhat létre. Ezenfelül ez az objektum a folyamat futása részleteinek monitorozására is használható. Állítsa subscription_id változót a Azure-előfizetés azonosítójára. Azoknak a Azure régióknak a listájához, amelyekben a Data Factory jelenleg elérhető, jelölje ki az Önt érdeklő régiókat az alábbi lapon, majd bontsa ki a Analytics elemet a Data Factory: Régiók szerint elérhető termékek. A data factory által használt adattárak (Azure Storage, Azure SQL Database stb.) és a data factory által használt számítások (HDInsight stb.) más régiókban is lehetnek.

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') 
    
        # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect.
        # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD
        # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id)
    
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

Adat-előállító létrehozása

Adja hozzá a következő kódot a Main metódushoz, amely létrehozza az adat-előállítót. Ha az erőforráscsoportja már létezik, tegye megjegyzésbe az első create_or_update utasítást.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

Társított szolgáltatás létrehozása

Adja hozzá a következő kódot a Main metódushoz, amely létrehoz egy Azure Storage társított szolgáltatást.

Társított szolgáltatásokat hoz létre egy adat-előállítóban az adattárak és a számítási szolgáltatások adat-előállítóval történő társításához. Ebben a gyorsútmutatóban csak egy Azure Storage-hez társított szolgáltatást kell létrehoznia, amely egyaránt szolgál másolási forrásként és célként, "AzureStorageLinkedService" néven a példában. Cserélje le <storageaccountname> és <storageaccountkey> az Azure Storage-fiók nevével és kulcsával.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

Adatkészletek létrehozása

Ebben a szakaszban két adatkészletet hoz létre: egyet a forráshoz, egyet a nyelőhöz.

Adatkészlet létrehozása a forrás Azure Blobhoz

Adja hozzá a következő kódot a fő metódushoz, amely létrehoz egy Azure blobadatkészletet. Az Azure Blob-adathalmaz tulajdonságairól a Azure blob-összekötő cikkben talál további információt.

Definiálhat egy adatkészletet, amely a Azure Blob forrásadatait jelöli. Ez a Blob-adatkészlet az előző lépésben létrehozott Azure Storage társított szolgáltatásra hivatkozik.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

Adatkészlet létrehozása fogadó Azure Blobhoz

Adja hozzá a következő kódot a fő metódushoz, amely létrehoz egy Azure blobadatkészletet. Az Azure Blob-adathalmaz tulajdonságairól a Azure blob-összekötő cikkben talál további információt.

Definiálhat egy adatkészletet, amely a Azure Blob forrásadatait jelöli. Ez a Blob-adatkészlet az előző lépésben létrehozott Azure Storage társított szolgáltatásra hivatkozik.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

Folyamat létrehozása

Adja hozzá a következő kódot a Main metódushoz, amely létrehozza a másolási tevékenységet tartalmazó folyamatot.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    
    #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
    #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
    
    p_name = 'copyPipeline'
    params_for_pipeline = {}

    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

Folyamatfuttatás létrehozása

Adja hozzá a következő kódot a Main metódushoz, amely elindítja a folyamat futását.

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

Pipeline futtatás figyelése

A folyamat futtatásának monitorozásához adja hozzá a következő kódot a Main metódushoz:

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

Most adja meg az alábbi utasítást, hogy a rendszer meghívja a main metódust, amikor a program fut:

# Start the main method
main()

Teljes szkript

Íme a teljes Python kód:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

A kód futtatása

Állítsa össze és indítsa el az alkalmazást, majd ellenőrizze a folyamat-végrehajtást.

A konzol megjeleníti a Data Factory, a társított szolgáltatás, az adatkészletek, az adatfolyamat és a folyamatfuttatás készülési folyamatát. Várjon, amíg megjelennek a másolási tevékenység futásának részletei, beleértve az olvasott és írt adatok méretét. Ezután használjon olyan eszközöket, mint a Azure Storage explorer annak ellenőrzéséhez, hogy a blob(ok) a változókban megadott "outputBlobPath" fájlból az "outputBlobPath" fájlba lesznek-e másolva.

Itt látható a minta kimenete:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

Az erőforrások takarítása

Az adat-előállító törléséhez adja hozzá a következő kódot a programhoz:

adf_client.factories.delete(rg_name, df_name)

A mintafolyamat adatokat másol egy Azure blobtároló egyik helyére. A Data Factory más forgatókönyvekben való használatát ismertető további információkért tekintse meg az oktatóanyagokat.