Mentransformasi data di cloud menggunakan aktivitas Spark di Azure Data Factory
BERLAKU UNTUK: Azure Data Factory
Azure Synapse Analytics
Tip
Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!
Dalam tutorial ini, Anda menggunakan Azure PowerShell untuk membuat akur Pabrik Data yang mengubah data menggunakan Aktivitas Spark dan layanan tertaut HDInsight sesuai permintaan. Anda akan melakukan langkah-langkah berikut dalam tutorial ini:
- Membuat pabrik data.
- Menulis dan menyebarkan layanan tertaut.
- Menulis dan menyebarkan alur.
- Memulai eksekusi alur.
- Pantau eksekusi alur.
Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.
Prasyarat
Catatan
Sebaiknya Anda menggunakan modul Azure Az PowerShell untuk berinteraksi dengan Azure. Untuk memulai, lihat Menginstal Azure PowerShell. Untuk mempelajari cara bermigrasi ke modul Az PowerShell, lihat Memigrasikan Azure PowerShell dari AzureRM ke Az.
- Akun Azure Storage. Anda membuat skrip Python dan file input, serta Anda mengunggahnya ke Azure Storage. Output dari program Spark disimpan di akun penyimpanan ini. Kluster Spark sesuai permintaan menggunakan akun penyimpanan yang sama dengan penyimpanan utamanya.
- Azure PowerShell. Ikuti instruksi di Cara menginstal dan mengonfigurasi Azure PowerShell.
Unggah skrip Python ke akun Blob Storage Anda
Buat file Python bernama WordCount_Spark.py dengan konten berikut:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Ganti <storageAccountName> dengan nama akun Azure Storage Anda. Lalu simpan file.
Di Azure Blob Storage Anda, buat kontainer bernama adftutorial jika tidak ada.
Buat folder bernama spark.
Buat subfolder bernama script di dalam folder spark.
Unggah file WordCount_Spark.py ke subfolder skrip.
Unggah file input
- Buat file bernama minecraftstory.txt dengan teks. Program spark menghitung jumlah kata dalam teks ini.
- Membuat subfolder bernama
inputfiles
dalam folderspark
. - Unggah
minecraftstory.txt
ke subfolderinputfiles
.
Layanan tertaut penulis
Anda menulis dua layanan tertaut di bagian ini:
- Layanan tertaut Azure Storage yang menautkan akun penyimpanan Azure Anda ke pabrik data. Penyimpanan ini digunakan oleh kluster Microsoft Azure HDInsight sesuai permintaan. Ini juga berisi skrip Spark yang akan dijalankan.
- Layanan Tertaut Microsoft Azure HDInsight Sesuai Permintaan. Azure Data Factory secara otomatis membuat kluster HDInsight, menjalankan program Spark, lalu menghapus kluster HDInsight setelah tidak aktif untuk waktu yang telah dikonfigurasi sebelumnya.
Layanan tertaut Microsoft Azure Storage
Buat file JSON menggunakan editor pilihan Anda, salin definisi JSON berikut dari layanan tertaut Microsoft Azure Storage, lalu simpan file sebagai MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Perbarui <storageAccountName> dan <storageAccountKey> dengan nama dan kunci akun Azure Storage Anda.
Layanan tertaut Microsoft Azure HDInsight sesuai permintaan
Buat file JSON menggunakan editor pilihan Anda, salin definisi JSON berikut dari layanan tertaut Microsoft Azure HDInsight, lalu simpan file sebagai MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Perbarui nilai untuk properti berikut ini dalam definisi layanan tertaut:
- hostSubscriptionId. Ganti <subscriptionID> dengan ID langganan Azure Anda. Kluster HDInsight sesuai permintaan dibuat dalam langganan ini.
- penyewa. Ganti <tenantID> dengan ID penyewa Azure Anda.
- servicePrincipalId, servicePrincipalKey. Ganti <servicePrincipalID> dan <servicePrincipalKey> dengan ID dan kunci perwakilan layanan Anda di ID Microsoft Entra. Perwakilan layanan ini harus menjadi anggota dengan peran Kontributor dari langganan atau Grup sumber daya tempat kluster dibuat. Lihat membuat aplikasi Microsoft Entra dan perwakilan layanan untuk detailnya. ID perwakilan layanan sama dengan ID Aplikasi dan Kunci perwakilan layanan sama dengan nilai untuk Rahasia klien.
- clusterResourceGroup. Ganti <resourceGroupOfHDICluster> dengan nama grup sumber daya tempat kluster HDInsight perlu dibuat.
Catatan
Azure HDInsight memiliki batasan jumlah total inti yang dapat Anda gunakan di setiap wilayah Azure yang didukung. Untuk Layanan Tertaut HDInsight Sesuai Permintaan, kluster HDInsight akan dibuat di lokasi yang sama dengan Azure Storage yang digunakan sebagai penyimpanan utamanya. Pastikan Anda memiliki kuota inti yang cukup untuk kluster yang berhasil dibuat. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan kluster di Microsoft Azure HDInsight dengan Hadoop, Spark, Kafka, dan lainnya.
Menulis alur
Dalam langkah ini, Anda membuat alur baru dengan aktivitas Spark. Aktivitas menggunakan sampel hitungan kata. Unduh konten dari lokasi ini jika Anda belum melakukannya.
Buat file JSON di editor pilihan Anda, salin definisi JSON berikut dari definisi alur, dan simpan sebagai MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Perhatikan poin berikut:
- rootPath menunjuk ke folder spark dari kontainer adftutorial.
- entryFilePath menunjuk ke file WordCount_Spark.py di sub folder skrip folder spark.
Membuat pabrik data
Anda telah pembuat definisi layanan dan alur tertaut dalam file JSON. Sekarang, mari kita buat pabrik data, dan terapkan file Service dan alur JSON yang ditautkan dengan menggunakan cmdlet PowerShell. Jalankan perintah PowerShell berikut satu per satu:
Atur variabel satu per satu.
Nama Grup Sumber Daya
$resourceGroupName = "ADFTutorialResourceGroup"
Nama Data Factory. Harus unik secara global
$dataFactoryName = "MyDataFactory09102017"
Nama alur
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Luncurkan PowerShell. Biarkan Azure PowerShell terbuka hingga akhir panduan mulai cepat ini. Jika Anda menutup dan membuka kembali, Anda perlu menjalankan perintah lagi. Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Azure Storage, Azure SQL Database, dll.) dan komputasi (HDInsight, dll.) yang digunakan oleh pabrik data dapat berada di wilayah lain.
Jalankan perintah berikut dan masukkan nama pengguna serta kata sandi yang Anda gunakan untuk masuk ke portal Microsoft Azure:
Connect-AzAccount
Jalankan perintah berikut untuk menampilkan semua langganan untuk akun ini:
Get-AzSubscription
Jalankan perintah berikut untuk memilih langganan yang ingin Anda gunakan. Ganti SubscriptionId dengan ID langganan Azure Anda:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Membuat grup sumber daya: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Buat pabrik data.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Jalankan perintah berikut untuk melihat output:
$df
Beralih ke folder tempat Anda membuat file JSON, dan jalankan perintah berikut untuk menyebarkan layanan tertaut Azure Storage:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Jalankan perintah berikut untuk menyebarkan layanan tertaut Spark sesuai permintaan:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Jalankan perintah berikut untuk menyebarkan alur:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Memulai dan memantau eksekusi alur
Memulai eksekusi alur. Ini juga menangkap ID eksekusi alur untuk pemantauan di masa depan.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Jalankan skrip berikut ini untuk terus memeriksa status eksekusi alur hingga selesai.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Berikut adalah output dari eksekusi sampel:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Konfirmasikan bahwa folder bernama
outputfiles
dibuat di folderspark
kontainer adftutorial dengan output dari program spark.
Konten terkait
Alur dalam sampel ini menyalin data dari satu lokasi ke lokasi lain dalam penyimpanan blob Azure. Anda mempelajari cara untuk:
- Membuat pabrik data.
- Menulis dan menyebarkan layanan tertaut.
- Menulis dan menyebarkan alur.
- Memulai eksekusi alur.
- Pantau eksekusi alur.
Lanjutkan ke tutorial berikutnya untuk mempelajari cara mengubah data dengan menjalankan skrip Apache Hive pada klaster Azure HDInsight yang berada dalam jaringan virtual.
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk