Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Tip
Data Factory di Microsoft Fabric adalah generasi Azure Data Factory berikutnya, dengan arsitektur yang lebih sederhana, AI bawaan, dan fitur baru. Jika Anda baru menggunakan integrasi data, mulailah dengan Fabric Data Factory. Beban kerja ADF yang ada dapat ditingkatkan ke Fabric untuk mengakses kemampuan baru di seluruh ilmu data, analitik real time, dan pelaporan.
Dalam panduan singkat ini, Anda membuat data factory dengan menggunakan Python. Alur di pabrik data ini menyalin data dari satu folder ke folder lain di penyimpanan blob Azure.
Azure Data Factory adalah layanan integrasi data berbasis cloud yang memungkinkan Anda membuat alur kerja berbasis data untuk mengatur dan mengotomatiskan pergerakan data dan transformasi data. Dengan menggunakan Azure Data Factory, Anda dapat membuat dan menjadwalkan alur kerja berbasis data, yang disebut alur.
Alur dapat menyerap data dari penyimpanan data yang berbeda. Alur memproses atau mengubah data dengan menggunakan layanan komputasi seperti Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, dan Azure Machine Learning. Pipeline mempublikasikan output ke penyimpanan data seperti Azure Synapse Analytics untuk aplikasi kecerdasan bisnis (BI).
Prasyarat
Akun Azure dengan langganan aktif. Buat akun gratis.
Azure Storage Explorer (opsional).
Aplikasi di Microsoft Entra ID. Buat aplikasi dengan mengikuti langkah-langkah di tautan ini, menggunakan Opsi Autentikasi 2 (rahasia aplikasi), dan tetapkan aplikasi ke peran Kontributor dengan mengikuti petunjuk di artikel yang sama. Catat nilai berikut seperti yang ditampilkan dalam artikel untuk digunakan di langkah selanjutnya: ID aplikasi (klien), nilai rahasia klien, dan ID tenant.
Membuat dan mengunggah berkas masukan
Luncurkan Notepad. Salin teks berikut dan simpan sebagai file input.txt di disk Anda.
John|Doe Jane|DoeGunakan alat seperti Azure Storage Explorer untuk membuat kontainer adfv2tutorial, dan folder input dalam kontainer. Lalu, unggah file input.txt ke folder input.
Menginstal paket Python
Buka terminal atau prompt perintah dengan hak administratif.
Pertama, instal paket Python untuk sumber daya manajemen Azure:
pip install azure-mgmt-resourceUntuk menginstal paket Python untuk Data Factory, jalankan perintah berikut:
pip install azure-mgmt-datafactorySDK Python untuk Data Factory mendukung Python 2.7 dan 3.6+.
Untuk menginstal paket Python untuk autentikasi identitas Azure, jalankan perintah berikut:
pip install azure-identityCatatan
Paket "azure-identity" mungkin memiliki konflik dengan "azure-cli" pada beberapa dependensi umum. Jika Anda memenuhi masalah autentikasi apa pun, hapus "azure-cli" dan dependensinya, atau gunakan mesin bersih tanpa menginstal paket "azure-cli" untuk membuatnya berfungsi. Untuk awan Sovereign, Anda harus menggunakan konstanta khusus awan yang sesuai. Silakan lihat Hubungkan ke semua wilayah menggunakan pustaka Azure untuk Python Multi-cloud di Microsoft Docs untuk instruksi terhubung dengan Python di awan Sovereign.
Membuat klien Data Factory
Buat file bernama datafactory.py. Tambahkan pernyataan berikut untuk menambahkan referensi ke namespace.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import timeTambahkan fungsi berikut yang mencetak informasi.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))Tambahkan kode berikut ke metode Utama yang membuat instans kelas DataFactoryManagementClient. Anda menggunakan objek ini untuk membuat pabrik data, layanan tertaut, himpunan data, dan alur. Anda juga menggunakan objek ini untuk memantau detail proses pipeline. Atur variabel subscription_id ke ID langganan Azure Anda. Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda di halaman berikut, lalu perluas Analytics untuk menemukan Data Factory: Products yang tersedia menurut wilayah. Penyimpanan data (Azure Storage, Azure SQL Database, dll.) dan komputasi (HDInsight, dll.) yang digunakan oleh pabrik data dapat berada di wilayah lain.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
Membuat pabrik data
Tambahkan kode berikut ke metode Utama yang membuat pabrik data. Jika grup sumber daya Anda sudah ada, komentari pernyataan create_or_update pertama.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
Membuat layanan terhubung
Tambahkan kode berikut ke metode Main yang membuat layanan tertaut Azure Storage.
Anda membuat layanan tertaut di pabrik data untuk menautkan penyimpanan data dan layanan komputasi ke pabrik data. Dalam panduan cepat ini, Anda hanya perlu membuat satu layanan tertaut Azure Storage sebagai sumber dan penyimpanan tujuan untuk salinan, yang diberi nama "AzureStorageLinkedService" dalam sampel. Ganti <storageaccountname> dan <storageaccountkey> dengan nama dan kunci akun Azure Storage Anda.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
Membuat himpunan data
Di bagian ini, Anda membuat dua himpunan data: satu untuk sumber dan yang lain untuk sink.
Membuat dataset untuk sumber Azure Blob
Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk informasi tentang properti himpunan data Blob Azure, lihat artikel konektor Blob Azure.
Anda menentukan himpunan data yang mewakili data sumber di blob Azure. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Membuat himpunan data untuk sink Azure Blob
Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk informasi tentang properti himpunan data Blob Azure, lihat artikel konektor Blob Azure.
Anda menentukan himpunan data yang mewakili data sumber di blob Azure. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
Buat alur
Tambahkan kode berikut ke dalam metode Utama yang membuat jalur pemrosesan dengan aktivitas menyalin.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
Menjalankan pipeline
Tambahkan kode berikut ke metode Utama yang memicu eksekusi alur.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
Memantau jalannya pipeline
Untuk memantau eksekusi alur, tambahkan kode berikut ke metode Utama:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
Sekarang, tambahkan pernyataan berikut untuk memanggil metode Utama saat program dijalankan:
# Start the main method
main()
Skrip lengkap
Berikut adalah kode Python lengkap:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
Menjalankan kode
Buat dan mulai aplikasi, lalu verifikasi eksekusi alur.
Konsol mencetak kemajuan pembuatan fabrik data, layanan tertaut, himpunan data, alur, dan jalannya alur. Tunggu hingga Anda melihat rincian pelaksanaan aktivitas penyalinan, termasuk ukuran data yang dibaca/ditulis. Kemudian, gunakan alat seperti Azure Storage Explorer untuk memeriksa bahwa blob telah disalin ke "outputBlobPath" dari "inputBlobPath", sesuai dengan yang Anda tentukan dalam variabel.
Berikut adalah output sampel:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
Membersihkan sumber daya
Untuk menghapus pabrik data, tambahkan kode berikut ke program:
adf_client.factories.delete(rg_name, df_name)
Konten terkait
Alur dalam sampel ini menyalin data dari satu lokasi ke lokasi lain dalam penyimpanan blob Azure. Ikuti tutorial untuk mempelajari tentang penggunaan Data Factory dalam skenario lainnya.