Azure Data Factory'de Spark etkinliğini kullanarak verileri bulutta dönüştürme

ŞUNLARA UYGULANIR: Azure Data Factory Azure Synapse Analytics

İpucu

Microsoft Fabric'daki Data Factory, daha basit bir mimariye, yerleşik yapay zekaya ve yeni özelliklere sahip yeni nesil Azure Data Factory. Veri tümleştirmeyi yeni kullanmaya başladıysanız Fabric Data Factory ile başlayın. Mevcut ADF iş yükleri veri bilimi, gerçek zamanlı analiz ve raporlama genelinde yeni özelliklere erişmek için Fabric yükseltebilir.

Bu öğreticide spark etkinliğini ve isteğe bağlı HDInsight bağlı hizmetini kullanarak verileri dönüştüren bir Data Factory işlem hattı oluşturmak için Azure PowerShell kullanacaksınız. Bu öğreticide aşağıdaki adımları gerçekleştireceksiniz:

  • Veri fabrikası oluşturma.
  • Bağlantılı hizmetleri oluştur ve dağıt.
  • Bir işlem hattı tasarlayın ve dağıtın.
  • Bir işlem hattı çalıştırmasını başlat.
  • İşlem hattı çalışmasını izleyin.

Azure aboneliğiniz yoksa başlamadan önce bir free hesabı oluşturun.

Önkoşullar

Not

Azure ile etkileşime geçmek için Azure Az PowerShell modülünü kullanmanızı öneririz. Başlamak için bkz. Azure PowerShell yükleme. Az PowerShell modülüne nasıl geçiş yapılacağını öğrenmek için bkz. AzureRM'den Az Azure PowerShell dağıtma.

  • Azure Storage account. bir Python betiği ve giriş dosyası oluşturup bunları Azure depolama alanına yüklersiniz. Spark programının çıktısı bu depolama hesabında depolanır. İsteğe bağlı Spark kümesi, birincil depolama alanıyla aynı depolama hesabını kullanır.
  • Azure PowerShell. Azure PowerShell yükleme ve yapılandırma başlığındaki yönergeleri izleyin.

Python betiğini Blob Storage hesabınıza yükleme

  1. aşağıdaki içeriğe sahip WordCount_Spark.py adlı bir Python dosyası oluşturun:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> değerini Azure Storage hesabınızın adıyla değiştirin. Ardından dosyayı kaydedin.

  3. Azure Blob Storage'nizde, adftutorial adlı bir kapsayıcı yoksa oluşturun.

  4. Spark adlı bir klasör oluşturun.

  5. Spark klasörünün altında script adlı bir alt klasör oluşturun.

  6. WordCount_Spark.py dosyasını script alt klasörüne yükleyin.

Girdi dosyasını yükleme

  1. Bazı metinlerle minecraftstory.txt adlı bir dosya oluşturun. Spark programı bu metindeki sözcükleri sayar.
  2. inputfiles klasöründe spark adlı bir alt klasör oluşturun.
  3. minecraftstory.txt dosyasını inputfiles alt klasörüne yükleyin.

Yazarla bağlantılı hizmetler

Bu bölümde iki Bağlı Hizmet oluşturacaksınız:

  • Bir Azure Storage hesabını veri fabrikasına bağlayan Azure Storage Bağlı Hizmet. Bu depolama alanı, isteğe bağlı HDInsight kümesi tarafından kullanılır. Ayrıca, yürütülecek Spark betiğini içerir.
  • Talep Üzerine HDInsight Bağlantılı Hizmeti. Azure Data Factory otomatik olarak bir HDInsight kümesi oluşturur, Spark programını çalıştırır ve önceden yapılandırılmış bir süre boşta kaldığında HDInsight kümesini siler.

Azure Storage bağlı hizmeti

Tercih ettiğiniz düzenleyiciyi kullanarak bir JSON dosyası oluşturun, Azure Storage bağlı hizmetin aşağıdaki JSON tanımını kopyalayın ve dosyayı MyStorageLinkedService.json olarak kaydedin.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

<storageAccountName> ve <storageAccountKey> Azure Storage hesabınızın adı ve anahtarıyla güncelleştirin.

Talep üzerine HDInsight bağlantılı hizmet

Tercih ettiğiniz düzenleyiciyi kullanarak bir JSON dosyası oluşturun, Azure HDInsight bağlı hizmetin aşağıdaki JSON tanımını kopyalayın ve dosyayı MyOnDemandSparkLinkedService.json olarak kaydedin.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Bağlı hizmet tanımında aşağıdaki özelliklerin değerlerini güncelleştirin:

  • hostSubscriptionId. <subscriptionID> değerini Azure aboneliğinizin kimliğiyle değiştirin. İsteğe bağlı HDInsight kümesi bu abonelikte oluşturulur.
  • kiracı <tenantID> değerini Azure kiracınızın kimliğiyle değiştirin.
  • servicePrincipalId, servicePrincipalKey. <servicePrincipalID> ve <servicePrincipalKey> Microsoft Entra ID hizmet sorumlunuzun kimliği ve anahtarıyla değiştirin. Bu hizmet sorumlusu, abonelikte ya da kümenin oluşturulduğu kaynak grubunda Katkıda Bulunan rolünün bir üyesi olmalıdır. Ayrıntılar için bkz. Microsoft Entra uygulama ve hizmet sorumlusu oluşturma. Hizmet sorumlusu kimliğiUygulama Kimliği ile eşdeğerdir ve bir Hizmet sorumlusu anahtarı, bir İstemci gizli anahtarı değerine eşdeğerdir.
  • clusterResourceGroup. <resourceGroupOfHDICluster> değerini HDInsight kümesinin oluşturulması gereken kaynak grubunun adıyla değiştirin.

Not

Azure HDInsight, desteklediği her Azure bölgede kullanabileceğiniz toplam çekirdek sayısıyla ilgili sınırlamaya sahiptir. İsteğe Bağlı HDInsight Bağlantılı Hizmeti için, HDInsight kümesi, birincil depolama alanı olarak kullanılan Azure Depolama ile aynı konumda oluşturulacaktır. Kümenin başarıyla oluşturulabilmesi için yeterince çekirdek kotanızın olduğundan emin olun. Daha fazla bilgi için bkz. HDInsight’ta Hadoop, Spark, Kafka ve daha fazlası ile küme ayarlama.

İşlem hattı oluşturma

Bu adımda, bir Spark aktivitesi ile yeni bir işlem hattı oluşturacaksınız. Etkinlik, sözcük sayısı örneğini kullanır. Henüz yapmadıysanız bu konumdan içeriği indirin.

Tercih ettiğiniz düzenleyicide bir JSON dosyası oluşturun, aşağıdaki işlem hattı JSON tanımını kopyalayın ve MySparkOnDemandPipeline.json olarak kaydedin.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Aaşağıdaki noktaları unutmayın:

  • rootPath, adftutorial kapsayıcısının spark klasörünü işaret eder.
  • entryFilePath, spark klasörünün script alt klasöründeki WordCount_Spark.py dosyasını işaret eder.

Veri fabrikası oluşturma

JSON dosyalarında bağlı hizmet ve işlem hattı tanımları oluşturdunuz. Şimdi bir veri fabrikası oluşturalım ve PowerShell cmdlet'lerini kullanarak bağlı Hizmet ve işlem hattı JSON dosyalarını dağıtalım. Aşağıdaki PowerShell komutlarını tek tek çalıştırın:

  1. Değişkenleri tek tek ayarlayın.

    Kaynak Grup Adı

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Data Factory Adı. Genel olarak benzersiz olması gerekir

    $dataFactoryName = "MyDataFactory09102017"
    

    İşlem hattı adı

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. PowerShell’i başlatın. Bu hızlı başlangıcın sonuna kadar Azure PowerShell açık tutun. Kapatıp yeniden açarsanız komutları yeniden çalıştırmanız gerekir. Data Factory'nin şu anda kullanılabilir olduğu Azure bölgelerin listesi için aşağıdaki sayfada ilginizi çekebilecek bölgeleri seçin ve ardından Analytics'yi genişleterek Data Factory: Products by region öğesini bulun. Veri fabrikası tarafından kullanılan veri depoları (Azure Storage, Azure SQL Database vb.) ve işlem (HDInsight vb.) diğer bölgelerde olabilir.

    Aşağıdaki komutu çalıştırın ve Azure portalında oturum açmak için kullandığınız kullanıcı adını ve parolayı girin:

    Connect-AzAccount
    

    Bu hesapla ilgili tüm abonelikleri görmek için aşağıdaki komutu çalıştırın:

    Get-AzSubscription
    

    Çalışmak istediğiniz aboneliği seçmek için aşağıdaki komutu çalıştırın. SubscriptionId değerini Azure aboneliğinizin kimliğiyle değiştirin:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. ADFTutorialResourceGroup kaynak grubunu oluşturun.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Veri fabrikasını oluşturun.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Çıktıyı görmek için aşağıdaki komutu yürütün:

    $df
    
  5. JSON dosyalarını oluşturduğunuz klasöre geçin ve Azure Storage bağlı hizmeti dağıtmak için aşağıdaki komutu çalıştırın:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Talep üzerine Spark bağlantılı hizmetini dağıtmak için aşağıdaki komutu çalıştırın.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Bir işlem hattını dağıtmak için aşağıdaki komutu çalıştırın:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Bir pipeline çalıştırmasını başlat ve izle

  1. Bir işlem hattı çalıştırmasını başlat. Ayrıca, gelecekte izlemek üzere işlem hattı çalıştırma kimliğini yakalar.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. İşlem hattı durumunu tamamlanıncaya kadar durmadan kontrol etmek için aşağıdaki betiği çalıştırın.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Örnek çalıştırmanın çıktısı aşağıdaki gibidir:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Spark programının çıktısı ile adftutorial kapsayıcısının outputfiles klasöründe spark adlı bir klasörün oluşturulduğunu onaylayın.

Bu örnekteki işlem hattı verileri bir konumdan Azure blob depolama alanındaki başka bir konuma kopyalar. Şunları öğrendiniz:

  • Veri fabrikası oluşturma.
  • Bağlantılı hizmetleri oluştur ve dağıt.
  • Bir işlem hattı tasarlayın ve dağıtın.
  • Bir işlem hattı çalıştırmasını başlat.
  • İşlem hattı çalışmasını izleyin.

Sanal ağdaki bir Azure HDInsight kümesinde Hive betiğini çalıştırarak verileri dönüştürmeyi öğrenmek için sonraki öğreticiye ilerleyin.

Tutorial: Azure Virtual Network içinde Hive kullanarak verileri dönüştürün.