資料擷取管線的 DevOps

多數情況下,資料擷取解決方案是由指令碼、服務叫用以及協調所有活動的管線所組成。 在本文中,您將瞭解如何將 DevOps 實務運用至常見資料內嵌管線的開發生命週期,以準備機器學習模型訓練所需的資料。 管線採用下列 Azure 服務建立:

  • Azure Data Factory:讀取原始資料並協調資料準備。
  • Azure Databricks:執行可轉換資料的 Python 筆記本。
  • Azure Pipelines:自動化持續整合和開發流程。

資料擷取管線工作流程

資料擷取管線會執行下列工作流程:

  1. 原始資料會讀入 Azure Data Factory (ADF) 管線中。
  2. ADF 管線會將資料傳送到 Azure Databricks 叢集,而該叢集會執行 Python 筆記本來轉換資料。
  3. 資料會儲存至 blob 容器,Azure Machine Learning 會在此使用資料訓練模型。

data ingestion pipeline workflow

持續整合和持續傳遞概觀

就像許多軟體解決方案一樣,有一個小組 (例如,資料工程師) 負責持續改善此解決方案。 這個小組會協作並共用相同的 Azure 資源,例如 Azure Data Factory、Azure Databricks 和 Azure 儲存體帳戶。 這些資源的集合便是開發環境。 資料工程師會針對相同的原始程式碼基礎進行改善。

持續整合和傳遞系統會將建立、測試及傳遞 (部署) 解決方案的流程自動化。 持續整合 (CI) 流程會執行下列工作:

  • 組合程式碼
  • 使用程式碼品質測試進行檢查
  • 執行單元測試
  • 產生成品,例如經過測試的程式碼和 Azure Resource Manager 範本

持續傳遞 (CD) 流程會將成品部署至下游環境。

cicd data ingestion diagram

本文示範如何使用 Azure Pipelines 將 CI 和 CD 流程自動化。

原始檔控制管理

原始檔控制管理是追蹤變更以及支援小組成員共同作業的必要條件。 例如,程式碼會儲存在 Azure DevOps、GitHub 或 GitLab 存放庫中。 共同作業工作流程會以分支模型為基礎。

Python 筆記本原始程式碼

資料工程師可以在本機的整合式開發環境 (例如 Visual Studio Code) 或是直接在 Databricks 工作區中使用 Python 筆記本原始程式碼。 程式碼變更完成後,將會合併至遵循分支原則的存放庫。

提示

我們建議將程式碼儲存在 .py 檔案中,而不採用 .ipynb Jupyter Notebook 格式。 這樣做可以讓程式碼更易於閱讀,並能支援 CI 流程中的自動程式碼品質檢查。

Azure Data Factory 原始程式碼

Azure Data Factory 管線的原始程式碼是由 Azure Data Factory 工作區所產生的 JSON 檔案集合。 資料工程師通常會在 Azure Data Factory 工作區中使用視覺化設計工具,而不是直接使用原始程式碼檔案。

若要將工作區設定為使用原始檔控制存放庫,請參閱使用 Azure Repos Git 整合來進行撰寫

持續整合 (CI)。

持續整合流程的最終目標是要從原始程式碼中收集聯合小組的工作,並準備好部署到下游環境。 正如原始程式碼管理,此程式與 Python 筆記本和 Azure Data Factory 管線不同。

Python Notebook CI

Python 筆記本的 CI 流程會從協作分支中取得程式碼 (例如 masterdevelop) 並執行下列活動:

  • 程式碼 Lint 分析
  • 單元測試
  • 將程式碼儲存為成品

下列程式碼片段展示這些步驟在 Azure DevOps yaml 管線中的實作:

steps:
- script: |
   flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
  workingDirectory: '$(Build.SourcesDirectory)'
  displayName: 'Run flake8 (code style analysis)'  
  
- script: |
   python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
  displayName: 'Run unit tests'

- task: PublishTestResults@2
  condition: succeededOrFailed()
  inputs:
    testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
    testRunTitle: 'Linting & Unit tests'
    failTaskOnFailedTests: true
  displayName: 'Publish linting and unit test results'

- publish: $(Build.SourcesDirectory)
    artifact: di-notebooks

管線會使用 flake8 進行 Python 程式碼 Lint 分析。 系統會執行在原始程式碼中定義的單元測試,並發佈 Lint 分析和測試結果,使其可在 Azure Pipelines 執行畫面中使用。

如果 Lint 分析和單元測試成功,管線就會將原始程式碼複製到成品存放庫,以供後續的部署步驟使用。

Azure Data Factory CI

Azure Data Factory 管線的 CI 流程是資料擷取管線的瓶頸。 該管線沒有持續整合。 Azure Data Factory 可部署的成品是 Azure Resource Manager 範本的集合。 產生這些範本的唯一方式是在 Azure Data Factory 工作區中按一下 [發佈] 按鈕。

  1. 資料工程師會將功能分支的原始代碼合併至協作分支,例如 masterdevelop
  2. 具有授與許可權的人員可按一下 [發佈] 按鈕,從協作分支的原始程式碼產生 Azure Resource Manager 範本。
  3. 工作區會驗證管線 (作為 Lint 分析和單元測試)、產生 Azure Resource Manager 範本 (作為建立),然後將產生的範本儲存至相同程式碼存放庫中的技術分支 adf_publish (作為發佈成品)。 此分支是由 Azure Data Factory 工作區自動建立。

如需此程式的詳細資訊,請參閱 Azure Data Factory 中的持續整合和傳遞

請務必確定產生的 Azure Resource Manager 範本適用於各種環境。 這表示所有會因環境而改變的值都必須參數化。 Azure Data Factory 的智慧功能能夠將大部分的這類值公開為參數。 例如,在下列範本中,Azure Machine Learning 工作區的連接屬性會公開為參數:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        "AzureMLService_servicePrincipalKey": {
            "value": ""
        },
        "AzureMLService_properties_typeProperties_subscriptionId": {
            "value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
        },
        "AzureMLService_properties_typeProperties_resourceGroupName": {
            "value": "devops-ds-rg"
        },
        "AzureMLService_properties_typeProperties_servicePrincipalId": {
            "value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
        },
        "AzureMLService_properties_typeProperties_tenant": {
            "value": "72f988bf-86f1-41af-912b-2d7cd611db47"
        }
    }
}

不過,您可能會想要公開預設不是由 Azure Data Factory 工作區處理的自訂屬性。 在本文的案例中,Azure Data Factory 管線會叫用處理資料的 Python 筆記本。 筆記本接受具有輸入資料檔案名稱的參數。

import pandas as pd
import numpy as np

data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)

labels = np.array(data['target'])
...

此名稱在 DevQAUATPROD 環境中會有所不同。 在具有多個活動的複雜管線中可以存在數個自訂屬性。 建議作法是將這些值統一收集在一個位置,並將其定義為管線變數

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Variables tab selected below with the option to add new variables, each with a name, type, and default value.

管線活動可能會在實際使用時參考管線變數:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Settings tab selected below.

依預設,Azure Data Factory 工作區不會將管線變數公開為 Azure Resource Manager 範本參數。 工作區會使用預設的參數化範本決定哪些管線屬性應公開為 Azure Resource Manager 範本參數。 若要將管線變數加入至清單,請使用下列程式碼片段更新預設參數化範本"Microsoft.DataFactory/factories/pipelines" 區段,並將結果 json 檔案放在來源資料夾的根目錄中:

"Microsoft.DataFactory/factories/pipelines": {
        "properties": {
            "variables": {
                "*": {
                    "defaultValue": "="
                }
            }
        }
    }

如此一來,在按一下 [發佈] 按鈕時便會強制 Azure Data Factory 工作區將變數新增至參數清單:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        ...
        "data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
            "value": "driver_prediction_train.csv"
        }        
    }
}

JSON 檔案中的值是在管線定義中設定的預設值。 部署 Azure Resource Manager 範本時,這些值應該會以目標環境值覆寫。

持續傳遞 (CD)。

持續傳遞流程會取得成品並部署至第一個目標環境。 此流程會藉由執行測試來確定解決方案是否正常運作。 如果成功,則會繼續進行下一個環境。

CD Azure Pipelines 是由代表環境的多個階段所組成。 每個階段都包含執行下列步驟的部署作業

  • 將 Python 筆記本部署到 Azure Databricks 工作區
  • 部署 Azure Data Factory 管線
  • 執行管線
  • 檢查資料擷取結果

管線階段可以透過核准閘道進行設定,藉此進一步控制部署流程在環境鏈中演變的方式。

部署 Python 筆記本

下列程式碼片段定義將 Python 筆記本複製到 Databricks 叢集的 Azure Pipeline 部署

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'       

持續整合所產生的成品會自動複製到部署代理程式,並可在 $(Pipeline.Workspace) 資料夾中取得。 在此情況下,部署工作是指包含 Python 筆記本的 di-notebooks 成品。 此部署會使用 Databricks Azure DevOps 延伸模組,將筆記本檔案複製到 Databricks 工作區。

Deploy_to_QA 階段包含 Azure DevOps 專案中所定義 devops-ds-qa-vg 變數群組的參考。 此階段中的步驟會參考此變數群組中的變數 (例如 $(DATABRICKS_URL) 以及 $(DATABRICKS_TOKEN))。 此處的概念是下一個階段 (例如 Deploy_to_UAT) 將會使用本身 UAT 範圍變數群組中定義的相同變數名稱來運作。

部署 Azure Data Factory 管線

Azure Data Factory 可部署的成品是 Azure Resource Manager 範本。 將會使用 Azure 資源群組部署工作進行部署,如下列程式碼片段所示:

  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"

資料檔案名參數的值來自 QA 階段變數群組中定義的 $(DATA_FILE_NAME) 變數。 同樣地,所有在 ARMTemplateForFactory.json 中定義的參數都可以覆寫。 如果不是,則會使用預設值。

執行管線並檢查資料擷取結果

下一步是確定部署的解決方案可正常運作。 下列工作定義會使用 PowerShell 腳本來執行 Azure Data Factory 管線,並在 Azure Databricks 叢集上執行 Python 筆記本。 筆記本會檢查資料是否已正確內嵌,並以 $(bin_FILE_NAME) 名稱驗證結果資料檔案。

  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'

作業中的最後一個工作會檢查筆記本執行的結果。 如果傳回錯誤,則會將管線執行的狀態設定為失敗。

總結

完整的 CI/CD Azure 管線包含下列階段:

  • CI
  • 部署至 QA
    • 部署至 Databricks + 部署至 ADF
    • 整合測試

其中包含與您的目標環境數量相等的部署階段。 每個部署階段都包含兩個平行執行的部署,以及一個部署完成後在該環境上測試解決方案的作業

管線的樣本實作會組合成下列 yaml 片段:

variables:
- group: devops-ds-vg

stages:
- stage: 'CI'
  displayName: 'CI'
  jobs:
  - job: "CI_Job"
    displayName: "CI Job"
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'
    - script: pip install --upgrade flake8 flake8_formatter_junit_xml
      displayName: 'Install flake8'
    - checkout: self
    - script: |
       flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
    workingDirectory: '$(Build.SourcesDirectory)'
    displayName: 'Run flake8 (code style analysis)'  
    - script: |
       python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
    displayName: 'Run unit tests'
    - task: PublishTestResults@2
    condition: succeededOrFailed()
    inputs:
        testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
        testRunTitle: 'Linting & Unit tests'
        failTaskOnFailedTests: true
    displayName: 'Publish linting and unit test results'    

    # The CI stage produces two artifacts (notebooks and ADF pipelines).
    # The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
      artifact: di-notebooks
    - checkout: git://${{variables['System.TeamProject']}}@adf_publish    
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
      artifact: adf-pipelines

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'             
  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'                

下一步