Mengubah data di cloud dengan menggunakan aktivitas Spark di Azure Data Factory

Azure Data Factory Azure Synapse Analytics

Petunjuk

Data Factory di Microsoft Fabric adalah generasi Azure Data Factory berikutnya, dengan arsitektur yang lebih sederhana, AI bawaan, dan fitur baru. Jika Anda baru menggunakan integrasi data, mulailah dengan Fabric Data Factory. Beban kerja ADF yang ada dapat ditingkatkan ke Fabric untuk mengakses kemampuan baru di seluruh ilmu data, analitik real time, dan pelaporan.

Dalam tutorial ini, Anda menggunakan Azure PowerShell untuk membuat alur Data Factory yang mengubah data menggunakan Aktivitas Spark dan layanan tertaut HDInsight sesuai permintaan. Anda akan melakukan langkah-langkah berikut dalam tutorial ini:

  • Membuat pabrik data.
  • Membuat dan menerapkan layanan tertaut.
  • Membuat dan mengimplementasikan pipa.
  • Memulai eksekusi alur.
  • Pantau eksekusi alur.

Jika Anda tidak memiliki langganan Azure, buat akun free sebelum Memulai.

Prasyarat

Catatan

Kami menyarankan agar Anda menggunakan modul Az PowerShell Azure untuk berinteraksi dengan Azure. Untuk memulai, lihat Install Azure PowerShell. Untuk mempelajari cara bermigrasi ke modul Az PowerShell, lihat Migrasikan Azure PowerShell dari AzureRM ke Az.

  • akun Azure Storage. Anda membuat skrip Python dan file input, dan mengunggahnya ke penyimpanan Azure. Output dari program Spark disimpan di akun penyimpanan ini. Kluster Spark sesuai permintaan menggunakan akun penyimpanan yang sama dengan penyimpanan utamanya.
  • Azure PowerShell. Ikuti instruksi di Cara menginstal dan mengonfigurasi Azure PowerShell.

Mengunggah skrip Python ke akun Blob Storage Anda

  1. Buat file Python bernama WordCount_Spark.py dengan konten berikut:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Ganti <storageAccountName> dengan nama akun Azure Storage Anda. Lalu simpan file.

  3. Di Azure Blob Storage Anda, buat kontainer bernama adftutorial jika tidak ada.

  4. Buat folder bernama spark.

  5. Buat subfolder bernama script di dalam folder spark.

  6. Unggah file WordCount_Spark.py ke subfolder skrip.

Unggah input file

  1. Buat file bernama minecraftstory.txt dengan teks. Program spark menghitung jumlah kata dalam teks ini.
  2. Membuat subfolder bernama inputfiles dalam folder spark.
  3. Unggah minecraftstory.txt ke subfolder inputfiles.

Layanan tertaut penulis

Anda mengonfigurasi dua Layanan Terhubung di bagian ini:

  • Layanan Tertaut Azure Storage yang menautkan akun Azure Storage ke pabrik data. Penyimpanan ini digunakan oleh kluster Microsoft Azure HDInsight sesuai permintaan. Ini juga berisi skrip Spark yang akan dijalankan.
  • Layanan Tertaut Microsoft Azure HDInsight Sesuai Permintaan. Azure Data Factory secara otomatis membuat kluster HDInsight, menjalankan program Spark, lalu menghapus kluster HDInsight setelah diam selama waktu yang telah dikonfigurasi sebelumnya.

layanan tertaut Azure Storage

Buat file JSON menggunakan editor pilihan Anda, salin definisi JSON berikut dari layanan tertaut Azure Storage, lalu simpan file sebagai MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Perbarui <storageAccountName> dan <storageAccountKey> dengan nama dan kunci akun Azure Storage Anda.

Layanan tertaut HDInsight sesuai permintaan

Buat file JSON menggunakan editor pilihan Anda, salin definisi JSON berikut dari layanan tertaut Azure HDInsight, dan simpan file sebagai MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Perbarui nilai untuk properti berikut ini dalam definisi layanan tertaut:

  • hostSubscriptionId. Ganti <subscriptionID> dengan ID langganan Azure Anda. Kluster HDInsight sesuai permintaan dibuat dalam langganan ini.
  • tenant. Ganti <tenantID> dengan ID penyewa Azure Anda.
  • servicePrincipalId, servicePrincipalKey. Ganti <servicePrincipalID> dan <servicePrincipalKey> dengan ID dan kunci perwakilan layanan Anda di Microsoft Entra ID. Perwakilan layanan ini harus menjadi anggota dengan peran Kontributor dari langganan atau Grup sumber daya tempat kluster dibuat. Lihat buat aplikasi Microsoft Entra dan perwakilan layanan untuk detailnya. ID perwakilan layanan sama dengan ID Aplikasi dan Kunci perwakilan layanan sama dengan nilai untuk Rahasia klien.
  • clusterResourceGroup. Ganti <resourceGroupOfHDICluster> dengan nama grup sumber daya tempat kluster HDInsight perlu dibuat.

Catatan

Azure HDInsight memiliki batasan jumlah total inti yang dapat Anda gunakan di setiap wilayah Azure yang didukungnya. Untuk Layanan Tertaut HDInsight Sesuai Permintaan, kluster HDInsight akan dibuat di lokasi yang sama dengan Azure Storage yang digunakan sebagai penyimpanan utamanya. Pastikan Anda memiliki kuota inti yang cukup untuk kluster yang berhasil dibuat. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan kluster di Microsoft Azure HDInsight dengan Hadoop, Spark, Kafka, dan lainnya.

Merancang pipeline

Dalam langkah ini, Anda membuat alur baru dengan aktivitas Spark. Aktivitas menggunakan contoh penghitungan kata. Unduh konten dari lokasi ini jika Anda belum melakukannya.

Buat file JSON di editor pilihan Anda, salin definisi JSON berikut dari definisi alur, dan simpan sebagai MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Perhatikan poin berikut:

  • "rootPath" menunjuk ke folder "spark" dari kontainer "adftutorial".
  • entryFilePath menunjuk ke file WordCount_Spark.py di sub folder skrip folder spark.

Membuat pabrik data

Anda telah membuat definisi layanan dan alur kerja tertaut dalam file JSON. Sekarang, mari kita buat pabrik data, dan terapkan file Service dan alur JSON yang ditautkan dengan menggunakan cmdlet PowerShell. Jalankan perintah PowerShell berikut satu per satu:

  1. Atur variabel satu per satu.

    Nama Grup Sumber Daya

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Nama Data Factory Harus unik secara global

    $dataFactoryName = "MyDataFactory09102017"
    

    Nama alur

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Luncurkan PowerShell. Tetap buka Azure PowerShell hingga menyelesaikan panduan cepat ini. Jika Anda menutup dan membuka kembali, Anda perlu menjalankan perintah lagi. Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda di halaman berikut, lalu perluas Analytics untuk menemukan Data Factory: Products yang tersedia menurut wilayah. Penyimpanan data (Azure Storage, Azure SQL Database, dll.) dan komputasi (HDInsight, dll.) yang digunakan oleh pabrik data dapat berada di wilayah lain.

    Jalankan perintah berikut, dan masukkan nama pengguna dan kata sandi yang Anda gunakan untuk masuk ke portal Azure:

    Connect-AzAccount
    

    Jalankan perintah berikut untuk menampilkan semua langganan untuk akun ini:

    Get-AzSubscription
    

    Jalankan perintah berikut untuk memilih langganan yang ingin Anda gunakan. Ganti SubscriptionId dengan ID langganan Azure Anda:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Membuat grup sumber daya: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Buat pabrik data.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Jalankan perintah berikut untuk melihat output:

    $df
    
  5. Beralih ke folder tempat Anda membuat file JSON, dan jalankan perintah berikut untuk menyebarkan layanan tertaut Azure Storage:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Jalankan perintah berikut untuk menyebarkan layanan tertaut Spark sesuai permintaan:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Jalankan perintah berikut untuk menyebarkan alur:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Memulai dan memantau pelaksanaan pipeline

  1. Memulai eksekusi alur. Ini juga menangkap ID eksekusi alur untuk pemantauan di masa depan.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Jalankan skrip berikut ini untuk terus memeriksa status eksekusi alur hingga selesai.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Berikut adalah output dari eksekusi sampel:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Konfirmasikan bahwa folder bernama outputfiles dibuat di folder spark kontainer adftutorial dengan output dari program spark.

Alur dalam sampel ini menyalin data dari satu lokasi ke lokasi lain dalam penyimpanan blob Azure. Anda mempelajari cara untuk:

  • Membuat pabrik data.
  • Membuat dan menerapkan layanan tertaut.
  • Membuat dan mengimplementasikan pipa.
  • Memulai eksekusi alur.
  • Pantau eksekusi alur.

Lanjutkan ke tutorial berikutnya untuk mempelajari cara mengubah data dengan menjalankan skrip Apache Hive pada kluster Azure HDInsight yang ada di jaringan virtual.