Bagikan melalui


Mentransformasi data di cloud menggunakan aktivitas Spark di Azure Data Factory

BERLAKU UNTUK: Azure Data Factory Azure Synapse Analytics

Tip

Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!

Dalam tutorial ini, Anda menggunakan Azure PowerShell untuk membuat akur Pabrik Data yang mengubah data menggunakan Aktivitas Spark dan layanan tertaut HDInsight sesuai permintaan. Anda akan melakukan langkah-langkah berikut dalam tutorial ini:

  • Membuat pabrik data.
  • Menulis dan menyebarkan layanan tertaut.
  • Menulis dan menyebarkan alur.
  • Memulai eksekusi alur.
  • Pantau eksekusi alur.

Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.

Prasyarat

Catatan

Sebaiknya Anda menggunakan modul Azure Az PowerShell untuk berinteraksi dengan Azure. Untuk memulai, lihat Menginstal Azure PowerShell. Untuk mempelajari cara bermigrasi ke modul Az PowerShell, lihat Memigrasikan Azure PowerShell dari AzureRM ke Az.

  • Akun Azure Storage. Anda membuat skrip Python dan file input, serta Anda mengunggahnya ke Azure Storage. Output dari program Spark disimpan di akun penyimpanan ini. Kluster Spark sesuai permintaan menggunakan akun penyimpanan yang sama dengan penyimpanan utamanya.
  • Azure PowerShell. Ikuti instruksi di Cara menginstal dan mengonfigurasi Azure PowerShell.

Unggah skrip Python ke akun Blob Storage Anda

  1. Buat file Python bernama WordCount_Spark.py dengan konten berikut:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Ganti <storageAccountName> dengan nama akun Azure Storage Anda. Lalu simpan file.

  3. Di Azure Blob Storage Anda, buat kontainer bernama adftutorial jika tidak ada.

  4. Buat folder bernama spark.

  5. Buat subfolder bernama script di dalam folder spark.

  6. Unggah file WordCount_Spark.py ke subfolder skrip.

Unggah file input

  1. Buat file bernama minecraftstory.txt dengan teks. Program spark menghitung jumlah kata dalam teks ini.
  2. Membuat subfolder bernama inputfiles dalam folder spark.
  3. Unggah minecraftstory.txt ke subfolder inputfiles.

Layanan tertaut penulis

Anda menulis dua layanan tertaut di bagian ini:

  • Layanan tertaut Azure Storage yang menautkan akun penyimpanan Azure Anda ke pabrik data. Penyimpanan ini digunakan oleh kluster Microsoft Azure HDInsight sesuai permintaan. Ini juga berisi skrip Spark yang akan dijalankan.
  • Layanan Tertaut Microsoft Azure HDInsight Sesuai Permintaan. Azure Data Factory secara otomatis membuat kluster HDInsight, menjalankan program Spark, lalu menghapus kluster HDInsight setelah tidak aktif untuk waktu yang telah dikonfigurasi sebelumnya.

Layanan tertaut Microsoft Azure Storage

Buat file JSON menggunakan editor pilihan Anda, salin definisi JSON berikut dari layanan tertaut Microsoft Azure Storage, lalu simpan file sebagai MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Perbarui <storageAccountName> dan <storageAccountKey> dengan nama dan kunci akun Azure Storage Anda.

Layanan tertaut Microsoft Azure HDInsight sesuai permintaan

Buat file JSON menggunakan editor pilihan Anda, salin definisi JSON berikut dari layanan tertaut Microsoft Azure HDInsight, lalu simpan file sebagai MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Perbarui nilai untuk properti berikut ini dalam definisi layanan tertaut:

  • hostSubscriptionId. Ganti <subscriptionID> dengan ID langganan Azure Anda. Kluster HDInsight sesuai permintaan dibuat dalam langganan ini.
  • penyewa. Ganti <tenantID> dengan ID penyewa Azure Anda.
  • servicePrincipalId, servicePrincipalKey. Ganti <servicePrincipalID> dan <servicePrincipalKey> dengan ID dan kunci perwakilan layanan Anda di ID Microsoft Entra. Perwakilan layanan ini harus menjadi anggota dengan peran Kontributor dari langganan atau Grup sumber daya tempat kluster dibuat. Lihat membuat aplikasi Microsoft Entra dan perwakilan layanan untuk detailnya. ID perwakilan layanan sama dengan ID Aplikasi dan Kunci perwakilan layanan sama dengan nilai untuk Rahasia klien.
  • clusterResourceGroup. Ganti <resourceGroupOfHDICluster> dengan nama grup sumber daya tempat kluster HDInsight perlu dibuat.

Catatan

Azure HDInsight memiliki batasan jumlah total inti yang dapat Anda gunakan di setiap wilayah Azure yang didukung. Untuk Layanan Tertaut HDInsight Sesuai Permintaan, kluster HDInsight akan dibuat di lokasi yang sama dengan Azure Storage yang digunakan sebagai penyimpanan utamanya. Pastikan Anda memiliki kuota inti yang cukup untuk kluster yang berhasil dibuat. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan kluster di Microsoft Azure HDInsight dengan Hadoop, Spark, Kafka, dan lainnya.

Menulis alur

Dalam langkah ini, Anda membuat alur baru dengan aktivitas Spark. Aktivitas menggunakan sampel hitungan kata. Unduh konten dari lokasi ini jika Anda belum melakukannya.

Buat file JSON di editor pilihan Anda, salin definisi JSON berikut dari definisi alur, dan simpan sebagai MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Perhatikan poin berikut:

  • rootPath menunjuk ke folder spark dari kontainer adftutorial.
  • entryFilePath menunjuk ke file WordCount_Spark.py di sub folder skrip folder spark.

Membuat pabrik data

Anda telah pembuat definisi layanan dan alur tertaut dalam file JSON. Sekarang, mari kita buat pabrik data, dan terapkan file Service dan alur JSON yang ditautkan dengan menggunakan cmdlet PowerShell. Jalankan perintah PowerShell berikut satu per satu:

  1. Atur variabel satu per satu.

    Nama Grup Sumber Daya

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Nama Data Factory. Harus unik secara global

    $dataFactoryName = "MyDataFactory09102017"
    

    Nama alur

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Luncurkan PowerShell. Biarkan Azure PowerShell terbuka hingga akhir panduan mulai cepat ini. Jika Anda menutup dan membuka kembali, Anda perlu menjalankan perintah lagi. Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Azure Storage, Azure SQL Database, dll.) dan komputasi (HDInsight, dll.) yang digunakan oleh pabrik data dapat berada di wilayah lain.

    Jalankan perintah berikut dan masukkan nama pengguna serta kata sandi yang Anda gunakan untuk masuk ke portal Microsoft Azure:

    Connect-AzAccount
    

    Jalankan perintah berikut untuk menampilkan semua langganan untuk akun ini:

    Get-AzSubscription
    

    Jalankan perintah berikut untuk memilih langganan yang ingin Anda gunakan. Ganti SubscriptionId dengan ID langganan Azure Anda:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Membuat grup sumber daya: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Buat pabrik data.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Jalankan perintah berikut untuk melihat output:

    $df
    
  5. Beralih ke folder tempat Anda membuat file JSON, dan jalankan perintah berikut untuk menyebarkan layanan tertaut Azure Storage:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Jalankan perintah berikut untuk menyebarkan layanan tertaut Spark sesuai permintaan:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Jalankan perintah berikut untuk menyebarkan alur:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Memulai dan memantau eksekusi alur

  1. Memulai eksekusi alur. Ini juga menangkap ID eksekusi alur untuk pemantauan di masa depan.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Jalankan skrip berikut ini untuk terus memeriksa status eksekusi alur hingga selesai.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Berikut adalah output dari eksekusi sampel:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Konfirmasikan bahwa folder bernama outputfiles dibuat di folder spark kontainer adftutorial dengan output dari program spark.

Alur dalam sampel ini menyalin data dari satu lokasi ke lokasi lain dalam penyimpanan blob Azure. Anda mempelajari cara untuk:

  • Membuat pabrik data.
  • Menulis dan menyebarkan layanan tertaut.
  • Menulis dan menyebarkan alur.
  • Memulai eksekusi alur.
  • Pantau eksekusi alur.

Lanjutkan ke tutorial berikutnya untuk mempelajari cara mengubah data dengan menjalankan skrip Apache Hive pada klaster Azure HDInsight yang berada dalam jaringan virtual.