Преобразование данных в облаке с помощью действия Spark в Azure Data Factory

ПРИМЕНИМО К: Azure Data Factory Azure Synapse Analytics

Совет

Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.

В этом руководстве вы используете Azure PowerShell для создания конвейера Data Factory, который преобразует данные с помощью Spark Activity и службы HDInsight по запросу. В этом руководстве вы выполните следующие шаги:

  • Создали фабрику данных.
  • Проконфигурируйте и разверните связанные службы.
  • Создание и развертывание конвейера.
  • Запуск конвейера.
  • Контролируйте выполнение конвейера.

Если у вас нет подписки Azure, создайте учетную запись free перед началом работы.

Предварительные требования

Примечание.

Мы рекомендуем использовать модуль Az PowerShell Azure для взаимодействия с Azure. Сведения о начале работы см. в разделе Install Azure PowerShell. Сведения о миграции в модуль Az PowerShell см. в статье Migrate Azure PowerShell из AzureRM в Az.

  • учетная запись Azure Storage. Вы создаете скрипт Python и входной файл и отправляете их в хранилище Azure. Выходные данные программы Spark хранятся в этой учетной записи хранения. Кластер Spark по запросу использует ту же учетную запись хранения, что и его основное хранилище.
  • Azure PowerShell. Следуйте инструкциям в Как установить и настроить Azure PowerShell.

Отправка скрипта Python в учетную запись Blob Storage

  1. Создайте файл Python с именем WordCount_Spark.py со следующим содержимым:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Замените <storageAccountName> именем учетной записи Azure Storage. Затем сохраните файл.

  3. В Azure Blob Storage создайте контейнер с именем adftutorial если он не существует.

  4. Создайте папку с именем spark.

  5. Создайте вложенную папку с именем script в папке spark.

  6. Отправьте файл WordCount_Spark.py во вложенную папку script.

Отправка входного файла

  1. Создайте файл с определенным текстом и назовите его minecraftstory.txt. Программа Spark подсчитывает количество слов в этом тексте.
  2. Создайте вложенную папку с именем inputfiles в папке spark.
  3. Отправьте файл minecraftstory.txt во вложенную папку inputfiles.

Автор связанных сервисов

Вы создаете две связанные службы в этом разделе:

  • Связанная служба Azure Storage, которая связывает учетную запись Azure Storage с фабрикой данных. Это хранилище используется кластером HDInsight по запросу. В нем также содержится скрипт Spark для выполнения.
  • Связанная служба HDInsight по запросу. Azure Data Factory автоматически создает кластер HDInsight, запускает программу Spark, а затем удаляет кластер HDInsight после его простоя в течение предварительно настроенного времени.

связанная служба Azure Storage

Создайте JSON-файл с помощью предпочтительного редактора, скопируйте следующее определение JSON связанной службы Azure Storage, а затем сохраните файл как MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Обновите <storageAccountName> и <storageAccountKey> с именем и ключом учетной записи Azure Storage.

Связанная служба HDInsight по запросу

Создайте JSON-файл с помощью предпочтительного редактора, скопируйте следующее определение JSON связанной службы Azure HDInsight и сохраните файл как MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Обновите значения следующих свойств в определении связанных служб:

  • hostSubscriptionId. Замените <subscriptionID> идентификатором подписки Azure. В этой подписке создается кластер HDInsight по требованию.
  • арендатор. Замените <tenantID> идентификатором клиента Azure.
  • servicePrincipalId, servicePrincipalKey. Замените <servicePrincipalID> и <servicePrincipalKey> идентификатором и ключом вашего служебного принципала в Microsoft Entra ID. Этому служебному принципалу должна быть назначена роль Участник подписки или группы ресурсов, в которой создается кластер. См. раздел создание приложения и сервисного участника Microsoft Entra для получения дополнительных сведений. Идентификатор субъекта-службы —это эквивалент идентификатора приложения, а ключ субъекта-службы — значения секрета клиента.
  • clusterResourceGroup. Замените <resourceGroupOfHDICluster> на имя группы ресурсов, в которой необходимо создать кластер HDInsight.

Примечание.

Azure HDInsight имеет ограничение на общее количество ядер, которые можно использовать в каждом регионе Azure, который он поддерживает. Для связанной службы HDInsight при запросе кластер HDInsight будет создан в том же расположении, что и Azure Storage, используемый в качестве основного хранилища. Убедитесь, что имеется достаточное количество квот ядра для успешного создания кластера. Дополнительные сведения см. в статье Установка кластеров в HDInsight с использованием Hadoop, Spark, Kafka и других технологий.

Разработать поток данных

На этом этапе создайте конвейер с активностью Spark. В действии используется пример подсчета слов. Загрузите содержимое из этого расположения, если вы еще не сделали этого.

Создайте файл JSON в предпочитаемом редакторе, скопируйте следующее определение JSON определения конвейера и сохраните его как MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Обратите внимание на следующие аспекты:

  • rootPath указывает на папку spark контейнера adftutorial.
  • entryFilePath указывает на файл WordCount_Spark.py во вложенной папке скрипта папки spark.

Создание фабрики данных

Вы создали связанную службу и определения конвейера в файлах JSON. Теперь нужно создать фабрику данных и развернуть связанную службу и файлы JSON конвейера с помощью командлетов PowerShell. Последовательно выполните следующие команды PowerShell:

  1. По очереди задайте переменные.

    Имя группы ресурсов

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Имя Фабрики данных. (оно должно быть глобально уникальным)

    $dataFactoryName = "MyDataFactory09102017"
    

    Имя конвейера

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Запустите PowerShell. Не закрывайте Azure PowerShell до завершения данного краткого руководства. Если закрыть и снова открыть это окно, то придется вновь выполнять эти команды. В списке регионов Azure, в которых в настоящее время доступен Data Factory, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Analytics, чтобы найти Data Factory: Продукты, доступные по регионам. Хранилища данных (Azure Storage, Azure SQL Database и т. д.) и вычислительные ресурсы (HDInsight и т. д.), используемые фабрикой данных, могут находиться в других регионах.

    Выполните следующую команду и введите имя пользователя и пароль, которые вы используете для входа на портал Azure:

    Connect-AzAccount
    

    Чтобы просмотреть все подписки для этой учетной записи, выполните следующую команду:

    Get-AzSubscription
    

    Выполните следующую команду, чтобы выбрать подписку, с которой вы собираетесь работать. Замените SubscriptionId идентификатором подписки Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Создайте группу ресурсов ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Создайте фабрику данных.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Чтобы просмотреть выходные данные, выполните следующую команду:

    $df
    
  5. Перейдите в папку, в которой вы создали JSON-файлы, и выполните следующую команду, чтобы развернуть связанную службу Azure Storage:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Выполните следующую команду, чтобы развернуть связанную службу Spark по запросу.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Выполните следующую команду, чтобы развернуть конвейер.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Запуск и мониторинг выполнения потока обработки

  1. Запуск конвейера. Эта команда также запишет идентификатор выполнения конвейера для будущего мониторинга.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Запустите следующий скрипт, чтобы проверять состояние выполнения, пока оно не завершится.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Вот результат примера выполнения:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Убедитесь, что папка с именем outputfiles создана в папке spark контейнера adftutorial с выходными данными программы Spark.

Pipeline в этом примере копирует данные из одного места в другое место в хранилище блобов Azure. Вы научились выполнять следующие задачи:

  • Создали фабрику данных.
  • Проконфигурируйте и разверните связанные службы.
  • Создание и развертывание конвейера.
  • Запуск конвейера.
  • Контролируйте выполнение конвейера.

Перейдите к следующему руководству, чтобы узнать, как преобразовать данные, выполнив скрипт Hive в кластере Azure HDInsight, который находится в виртуальной сети.