Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
BERLAKU UNTUK:
Azure Data Factory
Azure Synapse Analytics
Tip
Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!
Dalam panduan memulai cepat ini, Anda membuat data fabrik dengan menggunakan Python. Alur dalam pabrik data ini menyalin data dari satu folder ke folder lain dalam penyimpanan Azure Blob.
Azure Data Factory adalah layanan integrasi data berbasis cloud yang memungkinkan Anda membuat alur kerja berbasis data untuk mengatur dan mengotomatiskan pemindahan data dan transformasi data. Dengan menggunakan Azure Data Factory, Anda dapat membuat dan menjadwalkan alur kerja berbasis data, yang disebut alur.
Alur dapat menyerap data dari penyimpanan data yang berbeda. Alur memproses atau mengubah data dengan menggunakan layanan komputasi seperti Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, dan Azure Machine Learning. Jalur pemrosesan menerbitkan data output ke penyimpanan data seperti Azure Synapse Analytics untuk tujuan kecerdasan bisnis (BI).
Prasyarat
Akun Azure dengan langganan aktif. Buat akun gratis.
Azure Storage Explorer (opsional).
Sebuah aplikasi di Microsoft Entra ID. Buat aplikasi dengan mengikuti langkah-langkah di tautan ini, menggunakan Opsi Autentikasi 2 (rahasia aplikasi), dan tetapkan aplikasi ke peran Kontributor dengan mengikuti petunjuk di artikel yang sama. Catat nilai berikut seperti yang ditampilkan dalam artikel untuk digunakan di langkah selanjutnya: ID aplikasi (klien), nilai rahasia klien, dan ID tenant.
Membuat dan mengunggah berkas masukan
Luncurkan Notepad. Salin teks berikut dan simpan sebagai file input.txt di disk Anda.
John|Doe Jane|DoeGunakan alat seperti Azure Storage Explorer untuk membuat kontainer adfv2tutorial, dan folder input dalam kontainer. Lalu, unggah file input.txt ke folder input.
Menginstal paket Python
Buka terminal atau prompt perintah dengan hak administratif.
Pertama, instal paket Python untuk sumber daya pengelolaan Azure:
pip install azure-mgmt-resourceUntuk menginstal paket Python untuk Data Factory, jalankan perintah berikut:
pip install azure-mgmt-datafactoryPython SDK untuk Data Factory mendukung Python 2.7 dan 3.6+.
Untuk menginstal paket Python untuk autentikasi Azure Identity, jalankan perintah berikut:
pip install azure-identityCatatan
Paket "azure-identity" mungkin memiliki konflik dengan "azure-cli" pada beberapa dependensi umum. Jika Anda memenuhi masalah autentikasi apa pun, hapus "azure-cli" dan dependensinya, atau gunakan mesin bersih tanpa menginstal paket "azure-cli" untuk membuatnya berfungsi. Untuk awan Sovereign, Anda harus menggunakan konstanta khusus awan yang sesuai. Harap lihat Menghubungkan ke semua wilayah menggunakan pustaka Azure untuk Python Multi-cloud | Microsoft Docs untuk mendapatkan petunjuk guna terhubung dengan Python di Sovereign cloud.
Membuat klien Data Factory
Buat file bernama datafactory.py. Tambahkan pernyataan berikut untuk menambahkan referensi ke namespace.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import timeTambahkan fungsi berikut yang mencetak informasi.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))Tambahkan kode berikut ke metode Utama yang membuat instans kelas DataFactoryManagementClient. Anda menggunakan objek ini untuk membuat pabrik data, layanan tertaut, himpunan data, dan alur. Anda juga menggunakan objek ini untuk memantau detail proses pipeline. Atur variabel subscription_id ke ID langganan Azure Anda. Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Azure Storage, Azure SQL Database, dll.) dan komputasi (HDInsight, dll.) yang digunakan oleh pabrik data dapat berada di wilayah lain.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
Membuat pabrik data
Tambahkan kode berikut ke metode Utama yang membuat pabrik data. Jika grup sumber daya Anda sudah ada, komentari pernyataan create_or_update pertama.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
Membuat layanan terhubung
Tambahkan kode berikut ke metode Utama yang membuat layanan tertaut Azure Storage.
Anda membuat layanan tertaut di pabrik data untuk menautkan penyimpanan data dan layanan komputasi ke pabrik data. Dalam panduan memulai cepat ini, Anda hanya perlu membuat satu layanan tertaut Azure Storage sebagai sumber penyalinan dan target penyimpanan, bernama "AzureStorageLinkedService" dalam contoh. Ganti <storageaccountname> dan <storageaccountkey> dengan nama dan kunci akun Azure Storage Anda.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
Membuat himpunan data
Di bagian ini, Anda membuat dua himpunan data: satu untuk sumber dan yang lain untuk sink.
Buat dataset untuk sumber Azure Blob
Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk mengetahui informasi tentang properti himpunan data Azure Blob, lihat artikel Konektor blob Azure.
Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Membuat himpunan data untuk Azure Blob sink
Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk mengetahui informasi tentang properti himpunan data Azure Blob, lihat artikel Konektor blob Azure.
Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
Buat alur
Tambahkan kode berikut ke dalam metode Utama yang membuat jalur pemrosesan dengan aktivitas menyalin.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
Menjalankan pipeline
Tambahkan kode berikut ke metode Utama yang memicu eksekusi alur.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
Memantau jalannya pipeline
Untuk memantau eksekusi alur, tambahkan kode berikut ke metode Utama:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
Sekarang, tambahkan pernyataan berikut untuk memanggil metode Utama saat program dijalankan:
# Start the main method
main()
Skrip lengkap
Berikut kode Python lengkapnya:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
Menjalankan kode
Buat dan mulai aplikasi, lalu verifikasi eksekusi alur.
Konsol mencetak kemajuan pembuatan fabrik data, layanan tertaut, himpunan data, alur, dan jalannya alur. Tunggu hingga Anda melihat rincian pelaksanaan aktivitas penyalinan, termasuk ukuran data yang dibaca/ditulis. Kemudian, gunakan alat seperti Azure Storage explorer untuk memeriksa blob disalin ke "outputBlobPath" dari "inputBlobPath" seperti yang Anda tentukan dalam variabel.
Berikut adalah output sampel:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
Membersihkan sumber daya
Untuk menghapus pabrik data, tambahkan kode berikut ke program:
adf_client.factories.delete(rg_name, df_name)
Konten terkait
Alur dalam sampel ini menyalin data dari satu lokasi ke lokasi lain dalam penyimpanan blob Azure. Ikuti tutorial untuk mempelajari tentang penggunaan Data Factory dalam skenario lainnya.