Rövid útmutató: Adat-előállító és folyamat létrehozása Python használatával
A következőkre vonatkozik: Azure Data Factory Azure Synapse Analytics
Tipp.
Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!
Ebben a rövid útmutatóban létrehoz egy adat-előállítót a Python használatával. A data factory folyamata adatokat másol az egyik mappából egy másikba az Azure Blob Storage-ban.
Az Azure Data Factory egy felhőalapú adatintegrációs szolgáltatás, amellyel adatvezérelt munkafolyamatokat hozhat létre az adatáthelyezés és adatátalakítás vezényléséhez és automatizálásához. Az Azure Data Factory használatával adatvezérelt munkafolyamatokat, úgynevezett folyamatokat hozhat létre és ütemezhet.
A folyamatok különböző adattárakból is betölthetnek adatokat. A folyamatok olyan számítási szolgáltatások használatával dolgozzák fel vagy alakítják át az adatokat, mint az Azure HDInsight Hadoop, a Spark, az Azure Data Lake Analytics és az Azure Machine Learning. A folyamatok kimeneti adatokat tesznek közzé az adattárakban, például az Azure Synapse Analytics üzletiintelligencia-alkalmazásokban.
Előfeltételek
Egy Azure-fiók, aktív előfizetéssel. Hozzon létre egyet ingyen.
Azure Storage Explorer (nem kötelező).
Alkalmazás a Microsoft Entra-azonosítóban. Hozza létre az alkalmazást a hivatkozás lépéseinek követésével a 2. hitelesítési lehetőség (alkalmazáskulcs) használatával, és rendelje hozzá az alkalmazást a közreműködői szerepkörhöz az ugyanabban a cikkben található utasításokat követve. Jegyezze fel a következő értékeket a cikkben látható módon, amelyet a későbbi lépésekben használhat: alkalmazás-(ügyfél-) azonosító, titkos ügyfélkód és bérlőazonosító.
Bemeneti fájl létrehozása és feltöltése
Indítsa el a Jegyzettömböt. Másolja be az alábbi szöveget, és mentse egy input.txt nevű fájlként a lemezen.
John|Doe Jane|Doe
Az Azure Storage Explorerrel vagy egy hozzá hasonló eszközzel hozza létre az adfv2tutorial tárolót, és a tárolóban az input mappát. Ezután töltse fel az input.txt fájlt az input mappába.
A(z) Python-csomag telepítése
Nyisson meg egy terminált vagy parancssort rendszergazdai jogosultságokkal.
Először telepítse a Python-csomagot az Azure felügyeleti erőforrásokhoz:
pip install azure-mgmt-resource
A Python-csomag a Data Factoryhoz történő telepítéséhez futtassa az alábbi parancsot:
pip install azure-mgmt-datafactory
A Python SDK for Data Factory támogatja a Python 2.7-et és a 3.6+-ot.
Az Azure Identity-hitelesítéshez készült Python-csomag telepítéséhez futtassa a következő parancsot:
pip install azure-identity
Feljegyzés
Az "azure-identity" csomag egyes gyakori függőségek esetében ütközhet az "azure-cli"-vel. Ha bármilyen hitelesítési problémát tapasztal, távolítsa el az "azure-cli" és annak függőségeit, vagy használjon tiszta gépet az "azure-cli" csomag telepítése nélkül, hogy működjön. Szuverén felhők esetén a megfelelő felhőspecifikus állandókat kell használnia. Tekintse meg a Többfelhős Pythonhoz készült Azure-kódtárak használatával az összes régióhoz való csatlakozást | A Microsoft Docs útmutatást ad a Pythonhoz való csatlakozáshoz szuverén felhőkben.
Adat-előállító ügyfél létrehozása
Hozzon létre egy datafactory.py nevű fájlt. Adja hozzá az alábbi utasításokat, hogy a névterekre mutató hivatkozásokat tudjon felvenni.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import time
Adja hozzá az alábbi függvényeket az adatok megjelenítéséhez.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))
Adja hozzá a következő kódot a Main metódushoz, amely létrehoz egy DataPipelineManagementClient osztályú példányt. Ezzel az objektummal adat-előállítót, társított szolgáltatást, adatkészleteket és folyamatot hozhat létre. Ezenfelül ez az objektum a folyamat futása részleteinek monitorozására is használható. Állítsa be a subscription_id változót az Azure-előfizetés azonosítójaként. Azon Azure-régiók megtekintéséhez, amelyekben jelenleg elérhető a Data Factory, a következő lapon válassza ki az Önt érdeklő régiókat, majd bontsa ki az Elemzés részt, és keresse meg a Data Factory: Elérhető termékek régiók szerint szakaszt. Az adat-előállítók által használt adattárak (Azure Storage, Azure SQL Database stb.) és számítási erőforrások (HDInsight stb.) más régiókban is lehetnek.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Soverign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
Adat-előállító létrehozása
Adja hozzá a következő kódot a Main metódushoz, amely létrehozza az adat-előállítót. Ha az erőforráscsoportja már létezik, tegye megjegyzésbe az első create_or_update
utasítást.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
Társított szolgáltatás létrehozása
Adja hozzá a következő kódot a Main metódushoz, amely létrehoz egy Azure Storage-beli társított szolgáltatást.
Társított szolgáltatásokat hoz létre egy adat-előállítóban az adattárak és a számítási szolgáltatások adat-előállítóval történő társításához. Ebben a rövid útmutatóban csak egy Azure Storage-beli társított szolgáltatást kell létrehoznia a másolás forrásaként és fogadó adattárként. A példában ennek a neve: AzureStorageLinkedService. A <storageaccountname>
és a <storageaccountkey>
kifejezés helyére írja be Azure Storage-fiókja nevét, illetve kulcsát.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
Adatkészletek létrehozása
Ebben a részben két adatkészletet hoz létre: egyet a forráshoz és egyet a fogadóhoz.
Adatkészlet létrehozása a forrás Azure Blobhoz
Adja hozzá a következő kódot a Main metódushoz, amely létrehoz egy Azure Blob-adatkészletet. Az Azure Blob-adatkészlet tulajdonságaival kapcsolatos információkért tekintse meg az Azure Blob-összekötőt ismertető cikket.
Meghatároz egy adatkészletet, amely a forrásadatokat jelöli az Azure Blobban. Ez a blob-adatkészlet az előző lépésben létrehozott Azure Storage-beli társított szolgáltatásra vonatkozik.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Adatkészlet létrehozása a fogadó Azure Blobhoz
Adja hozzá a következő kódot a Main metódushoz, amely létrehoz egy Azure Blob-adatkészletet. Az Azure Blob-adatkészlet tulajdonságaival kapcsolatos információkért tekintse meg az Azure Blob-összekötőt ismertető cikket.
Meghatároz egy adatkészletet, amely a forrásadatokat jelöli az Azure Blobban. Ez a blob-adatkészlet az előző lépésben létrehozott Azure Storage-beli társított szolgáltatásra vonatkozik.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
Folyamat létrehozása
Adja hozzá a következő kódot a Main metódushoz, amely létrehozza a másolási tevékenységet tartalmazó folyamatot.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
Folyamat futásának létrehozása
Adja hozzá a következő kódot a Main metódushoz, amely elindítja a folyamat futását.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
Folyamat futásának monitorozása
A folyamat futtatásának monitorozásához adja hozzá a következő kódot a Main metódushoz:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
Most adja meg az alábbi utasítást, hogy a rendszer meghívja a main metódust, amikor a program fut:
# Start the main method
main()
Teljes szkript
Itt látható a teljes Python-kód:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
A kód futtatása
Állítsa össze és indítsa el az alkalmazást, majd ellenőrizze a folyamat-végrehajtást.
A konzol megjeleníti az adat-előállító, a társított szolgáltatás, az adatkészletek, a folyamat, valamint a folyamat futása létrehozási állapotát. Várjon, amíg megjelennek a másolási tevékenység futásának részletei, beleértve az olvasott és írt adatok méretét. Ezután például az Azure Storage Explorerhez hasonló eszközök használatával ellenőrizheti, hogy a blobok a változókban megadottak szerint át lettek-e másolva az outputBlobPath helyre az inputBlobPath helyről.
Itt látható a minta kimenete:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
Az erőforrások eltávolítása
Az adat-előállító törléséhez adja hozzá az alábbi kódot a programhoz:
adf_client.factories.delete(rg_name, df_name)
Kapcsolódó tartalom
A példában szereplő folyamat adatokat másol az egyik helyről egy másikra egy Azure Blob Storage-ban. A Data Factory más forgatókönyvekben való használatát ismertető további információkért tekintse meg az oktatóanyagokat.