Skript PowerShellu – Transformace dat v cloudu pomocí Azure Data Factory
Tento ukázkový skript PowerShellu vytvoří kanál, který transformuje data v cloudu spuštěním programu Spark v clusteru Azure HDInsight Spark.
Poznámka:
Při práci s Azure doporučujeme používat modul Azure Az PowerShellu. Pokud chcete začít, přečtěte si téma Instalace Azure PowerShellu. Informace o tom, jak migrovat na modul Az PowerShell, najdete v tématu Migrace Azure PowerShellu z AzureRM na Az.
Tato ukázka vyžaduje Azure PowerShell. Verzi zjistíte spuštěním příkazu Get-Module -ListAvailable Az
.
Pokud potřebujete instalaci nebo upgrade, přečtěte si téma Instalace modulu Azure PowerShell.
Spuštěním rutiny Připojení-AzAccount se připojte k Azure.
Požadavky
- Účet služby Azure Storage. Vytvořte skript Pythonu a vstupní soubor a nahrajte je do úložiště Azure. V tomto účtu úložiště se ukládá výstup z programu Sparku. Cluster Spark na vyžádání používá stejný účet úložiště jako primární úložiště.
Nahrání skriptu Pythonu do účtu služby Blob Storage
Vytvořte soubor Pythonu s názvem WordCount_Spark.py s následujícím obsahem:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Nahraďte <storageAccountName> názvem vašeho účtu služby Azure Storage. Pak soubor uložte.
Ve službě Azure Blob Storage, vytvořte kontejner nazvaný adftutorial, pokud ještě neexistuje.
Vytvořte složku s názvem spark.
Ve složce spark vytvořte podsložku script.
Do podsložky script uložte soubor WordCount_Spark.py.
Nahrání vstupního souboru
- Vytvořte soubor minecraftstory.txt a nějakým textem. Program Sparku spočítá slova v tomto textu.
- Vytvořte podsložku s názvem
inputfiles
vespark
složce kontejneru objektů blob. - Do podsložky
inputfiles
uložte souborminecraftstory.txt
.
Ukázkový skript
Důležité
Tento skript vytvoří soubory JSON, které definují entity služby Data Factory (propojená služba, datová sada a kanál) na pevném disku ve složce c:\.
powershell Set-ExecutionPolicy Unrestricted -Scope CurrentUser
# Set variables with your own values
$resourceGroupName = "<Azure resource group name>"
$dataFactoryName = "<Data factory name. Must be globally unique.>"
$dataFactoryRegion = "East US"
$storageAccountName = "<Az.Storage account name> "
$storageAccountKey = "<Az.Storage account key>"
$subscriptionID = "<Azure subscription ID>"
$tenantID = "<tenant ID>"
$servicePrincipalID = "<Active directory service principal ID>"
$servicePrincipalKey = "<Active directory service principal key>"
$pipelineName = "SparkTransformPipeline"
# Create a resource group
New-AzResourceGroup -Name $resourceGroupName -Location $dataFactoryRegion
# Create a data factory
$df = Set-AzDataFactory -ResourceGroupName $resourceGroupName -Location $dataFactoryRegion -Name $dataFactoryName
# Create an Az.Storage linked service in the data factory
## JSON definition of the linked service.
$storageLinkedServiceDefinition = @"
{
"name": "AzureStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"value": "DefaultEndpointsProtocol=https;AccountName=$storageAccountName;AccountKey=$storageAccountKey",
"type": "SecureString"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command.
$storageLinkedServiceDefinition | Out-File c:\AzureStorageLinkedService.json
## Creates an Az.Storage linked service
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File c:\AzureStorageLinkedService.json
# Create on-demand Spark linked service in the data factory
## JSON definition of the linked service.
$sparkLinkedServiceDefinition = @"
{
"name": "OnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "$subscriptionID",
"servicePrincipalId": "$servicePrincipalID",
"servicePrincipalKey": {
"value": "$servicePrincipalKey",
"type": "SecureString"
},
"tenant": "$tenantID",
"clusterResourceGroup": "$resourceGroupName",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "AzureStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command.
$sparkLinkedServiceDefinition | Out-File c:\OnDemandSparkLinkedService.json
# Creates an on-demand Spark linked service
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "OnDemandSparkLinkedService" -File "C:\OnDemandSparkLinkedService.json"
# Create a pipeline in the data factory
## JSON definition of the pipeline
$pipelineDefinition = @"
{
"name": "SparkTransformPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "OnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "AzureStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryPipeline command.
$pipelineDefinition | Out-File c:\SparkTransformPipeline.json
## Create a pipeline with Spark Activity in the data factory
Set-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SparkTransformPipeline" -File "c:\SparkTransformPipeline.json"
# Create a pipeline run
## JSON definition for dummy pipeline parameters
$pipelineParameters = @"
{
"dummy": "b"
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Invoke-AzDataFactoryPipeline command.
$pipelineParameters | Out-File c:\PipelineParameters.json
# Create a pipeline run by using parameters
$runId = Invoke-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName -ParameterFile c:\PipelineParameters.json
# Check the pipeline run status until it finishes
Start-Sleep -Seconds 30
while ($True) {
$result = Get-AzDataFactoryActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
if (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
Start-Sleep -Seconds 300
}
else {
Write-Host "Pipeline $pipelineName run finished. Result:" -foregroundcolor "Yellow"
$result
break
}
}
# Get the activity run details
$result = Get-AzDataFactoryActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName `
-PipelineRunId $runId `
-RunStartedAfter (Get-Date).AddMinutes(-30) `
-RunStartedBefore (Get-Date).AddMinutes(30) `
-ErrorAction Stop
$result
if ($result.Status -eq "Succeeded") {`
$result.Output -join "`r`n"`
}`
else {`
$result.Error -join "`r`n"`
}
# To remove the data factory from the resource gorup
# Remove-AzDataFactory -Name $dataFactoryName -ResourceGroupName $resourceGroupName
#
# To remove the whole resource group
# Remove-AzResourceGroup -Name $resourceGroupName
Vyčištění nasazení
Po spuštění ukázkového skriptu můžete pomocí následujícího příkazu odebrat skupinu prostředků a všechny přidružené prostředky:
Remove-AzResourceGroup -ResourceGroupName $resourceGroupName
Pokud chcete odebrat datovou továrnu ze skupiny prostředků, spusťte následující příkaz:
Remove-AzDataFactoryV2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Vysvětlení skriptu
Tento skript používá následující příkazy:
Příkaz | Notes |
---|---|
New-AzResourceGroup | Vytvoří skupinu prostředků, ve které se ukládají všechny prostředky. |
Set-AzDataFactoryV2 | Vytvoření datové továrny |
Set-AzDataFactoryV2LinkedService | Vytvoří propojenou službu v datové továrně. Propojená služba propojí úložiště dat nebo výpočetní prostředky s datovou továrnou. |
Set-AzDataFactoryV2Pipeline | Vytvoří kanál v datové továrně. Kanál obsahuje jednu nebo více aktivit, které provádějí určitou operaci. V tomto kanálu aktivita Spark transformuje data spuštěním programu v clusteru Azure HDInsight Spark. |
Invoke-AzDataFactoryV2Pipeline | Vytvoří spuštění kanálu. Jinými slovy, spustí kanál. |
Get-AzDataFactoryV2ActivityRun | Získá podrobnosti o spuštění aktivity (spuštění aktivity) v kanálu. |
Remove-AzResourceGroup | Odstraní skupinu prostředků včetně všech vnořených prostředků. |
Související obsah
Další informace o Azure PowerShellu najdete v dokumentaci k Azure PowerShellu.
Další ukázky skriptů PowerShellu pro Azure Data Factory najdete v ukázkách Azure Data Factory PowerShellu.
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro