Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
ŞUNLARA UYGULANIR:
Azure Data Factory
Azure Synapse Analytics
İpucu
Microsoft Fabric'daki
Bu öğreticide spark etkinliğini ve isteğe bağlı HDInsight bağlı hizmetini kullanarak verileri dönüştüren bir Data Factory işlem hattı oluşturmak için Azure PowerShell kullanacaksınız. Bu öğreticide aşağıdaki adımları gerçekleştireceksiniz:
- Veri fabrikası oluşturma.
- Bağlantılı hizmetleri oluştur ve dağıt.
- Bir işlem hattı tasarlayın ve dağıtın.
- Bir işlem hattı çalıştırmasını başlat.
- İşlem hattı çalışmasını izleyin.
Azure aboneliğiniz yoksa başlamadan önce bir free hesabı oluşturun.
Önkoşullar
Not
Azure ile etkileşime geçmek için Azure Az PowerShell modülünü kullanmanızı öneririz. Başlamak için bkz. Azure PowerShell yükleme. Az PowerShell modülüne nasıl geçiş yapılacağını öğrenmek için bkz. AzureRM'den Az Azure PowerShell dağıtma.
- Azure Storage account. bir Python betiği ve giriş dosyası oluşturup bunları Azure depolama alanına yüklersiniz. Spark programının çıktısı bu depolama hesabında depolanır. İsteğe bağlı Spark kümesi, birincil depolama alanıyla aynı depolama hesabını kullanır.
- Azure PowerShell. Azure PowerShell yükleme ve yapılandırma başlığındaki yönergeleri izleyin.
Python betiğini Blob Storage hesabınıza yükleme
aşağıdaki içeriğe sahip WordCount_Spark.py adlı bir Python dosyası oluşturun:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()<storageAccountName> değerini Azure Storage hesabınızın adıyla değiştirin. Ardından dosyayı kaydedin.
Azure Blob Storage'nizde, adftutorial adlı bir kapsayıcı yoksa oluşturun.
Spark adlı bir klasör oluşturun.
Spark klasörünün altında script adlı bir alt klasör oluşturun.
WordCount_Spark.py dosyasını script alt klasörüne yükleyin.
Girdi dosyasını yükleme
- Bazı metinlerle minecraftstory.txt adlı bir dosya oluşturun. Spark programı bu metindeki sözcükleri sayar.
-
inputfilesklasöründesparkadlı bir alt klasör oluşturun. -
minecraftstory.txtdosyasınıinputfilesalt klasörüne yükleyin.
Yazarla bağlantılı hizmetler
Bu bölümde iki Bağlı Hizmet oluşturacaksınız:
- Bir Azure Storage hesabını veri fabrikasına bağlayan Azure Storage Bağlı Hizmet. Bu depolama alanı, isteğe bağlı HDInsight kümesi tarafından kullanılır. Ayrıca, yürütülecek Spark betiğini içerir.
- Talep Üzerine HDInsight Bağlantılı Hizmeti. Azure Data Factory otomatik olarak bir HDInsight kümesi oluşturur, Spark programını çalıştırır ve önceden yapılandırılmış bir süre boşta kaldığında HDInsight kümesini siler.
Azure Storage bağlı hizmeti
Tercih ettiğiniz düzenleyiciyi kullanarak bir JSON dosyası oluşturun, Azure Storage bağlı hizmetin aşağıdaki JSON tanımını kopyalayın ve dosyayı MyStorageLinkedService.json olarak kaydedin.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
<storageAccountName> ve <storageAccountKey> Azure Storage hesabınızın adı ve anahtarıyla güncelleştirin.
Talep üzerine HDInsight bağlantılı hizmet
Tercih ettiğiniz düzenleyiciyi kullanarak bir JSON dosyası oluşturun, Azure HDInsight bağlı hizmetin aşağıdaki JSON tanımını kopyalayın ve dosyayı MyOnDemandSparkLinkedService.json olarak kaydedin.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Bağlı hizmet tanımında aşağıdaki özelliklerin değerlerini güncelleştirin:
- hostSubscriptionId. <subscriptionID> değerini Azure aboneliğinizin kimliğiyle değiştirin. İsteğe bağlı HDInsight kümesi bu abonelikte oluşturulur.
- kiracı <tenantID> değerini Azure kiracınızın kimliğiyle değiştirin.
- servicePrincipalId, servicePrincipalKey. <servicePrincipalID> ve <servicePrincipalKey> Microsoft Entra ID hizmet sorumlunuzun kimliği ve anahtarıyla değiştirin. Bu hizmet sorumlusu, abonelikte ya da kümenin oluşturulduğu kaynak grubunda Katkıda Bulunan rolünün bir üyesi olmalıdır. Ayrıntılar için bkz. Microsoft Entra uygulama ve hizmet sorumlusu oluşturma. Hizmet sorumlusu kimliğiUygulama Kimliği ile eşdeğerdir ve bir Hizmet sorumlusu anahtarı, bir İstemci gizli anahtarı değerine eşdeğerdir.
- clusterResourceGroup. <resourceGroupOfHDICluster> değerini HDInsight kümesinin oluşturulması gereken kaynak grubunun adıyla değiştirin.
Not
Azure HDInsight, desteklediği her Azure bölgede kullanabileceğiniz toplam çekirdek sayısıyla ilgili sınırlamaya sahiptir. İsteğe Bağlı HDInsight Bağlantılı Hizmeti için, HDInsight kümesi, birincil depolama alanı olarak kullanılan Azure Depolama ile aynı konumda oluşturulacaktır. Kümenin başarıyla oluşturulabilmesi için yeterince çekirdek kotanızın olduğundan emin olun. Daha fazla bilgi için bkz. HDInsight’ta Hadoop, Spark, Kafka ve daha fazlası ile küme ayarlama.
İşlem hattı oluşturma
Bu adımda, bir Spark aktivitesi ile yeni bir işlem hattı oluşturacaksınız. Etkinlik, sözcük sayısı örneğini kullanır. Henüz yapmadıysanız bu konumdan içeriği indirin.
Tercih ettiğiniz düzenleyicide bir JSON dosyası oluşturun, aşağıdaki işlem hattı JSON tanımını kopyalayın ve MySparkOnDemandPipeline.json olarak kaydedin.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Aaşağıdaki noktaları unutmayın:
- rootPath, adftutorial kapsayıcısının spark klasörünü işaret eder.
- entryFilePath, spark klasörünün script alt klasöründeki WordCount_Spark.py dosyasını işaret eder.
Veri fabrikası oluşturma
JSON dosyalarında bağlı hizmet ve işlem hattı tanımları oluşturdunuz. Şimdi bir veri fabrikası oluşturalım ve PowerShell cmdlet'lerini kullanarak bağlı Hizmet ve işlem hattı JSON dosyalarını dağıtalım. Aşağıdaki PowerShell komutlarını tek tek çalıştırın:
Değişkenleri tek tek ayarlayın.
Kaynak Grup Adı
$resourceGroupName = "ADFTutorialResourceGroup"Data Factory Adı. Genel olarak benzersiz olması gerekir
$dataFactoryName = "MyDataFactory09102017"İşlem hattı adı
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipelinePowerShell’i başlatın. Bu hızlı başlangıcın sonuna kadar Azure PowerShell açık tutun. Kapatıp yeniden açarsanız komutları yeniden çalıştırmanız gerekir. Data Factory'nin şu anda kullanılabilir olduğu Azure bölgelerin listesi için aşağıdaki sayfada ilginizi çekebilecek bölgeleri seçin ve ardından Analytics'yi genişleterek Data Factory: Products by region öğesini bulun. Veri fabrikası tarafından kullanılan veri depoları (Azure Storage, Azure SQL Database vb.) ve işlem (HDInsight vb.) diğer bölgelerde olabilir.
Aşağıdaki komutu çalıştırın ve Azure portalında oturum açmak için kullandığınız kullanıcı adını ve parolayı girin:
Connect-AzAccountBu hesapla ilgili tüm abonelikleri görmek için aşağıdaki komutu çalıştırın:
Get-AzSubscriptionÇalışmak istediğiniz aboneliği seçmek için aşağıdaki komutu çalıştırın. SubscriptionId değerini Azure aboneliğinizin kimliğiyle değiştirin:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"ADFTutorialResourceGroup kaynak grubunu oluşturun.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"Veri fabrikasını oluşturun.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupNameÇıktıyı görmek için aşağıdaki komutu yürütün:
$dfJSON dosyalarını oluşturduğunuz klasöre geçin ve Azure Storage bağlı hizmeti dağıtmak için aşağıdaki komutu çalıştırın:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"Talep üzerine Spark bağlantılı hizmetini dağıtmak için aşağıdaki komutu çalıştırın.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"Bir işlem hattını dağıtmak için aşağıdaki komutu çalıştırın:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Bir pipeline çalıştırmasını başlat ve izle
Bir işlem hattı çalıştırmasını başlat. Ayrıca, gelecekte izlemek üzere işlem hattı çalıştırma kimliğini yakalar.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineNameİşlem hattı durumunu tamamlanıncaya kadar durmadan kontrol etmek için aşağıdaki betiği çalıştırın.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"Örnek çalıştırmanın çıktısı aşağıdaki gibidir:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"Spark programının çıktısı ile adftutorial kapsayıcısının
outputfilesklasöründesparkadlı bir klasörün oluşturulduğunu onaylayın.
İlgili içerik
Bu örnekteki işlem hattı verileri bir konumdan Azure blob depolama alanındaki başka bir konuma kopyalar. Şunları öğrendiniz:
- Veri fabrikası oluşturma.
- Bağlantılı hizmetleri oluştur ve dağıt.
- Bir işlem hattı tasarlayın ve dağıtın.
- Bir işlem hattı çalıştırmasını başlat.
- İşlem hattı çalışmasını izleyin.
Sanal ağdaki bir Azure HDInsight kümesinde Hive betiğini çalıştırarak verileri dönüştürmeyi öğrenmek için sonraki öğreticiye ilerleyin.
Tutorial: Azure Virtual Network içinde Hive kullanarak verileri dönüştürün.