DevOps voor een gegevensopnamepijplijn

In de meeste scenario's is een oplossing voor gegevensopname een samenstelling van scripts, service-aanroepen en een pijplijn die alle activiteiten organiseert. In dit artikel leert u hoe u DevOps-procedures toepast op de ontwikkelingslevenscyclus van een algemene pijplijn voor gegevensopname waarmee gegevens worden voorbereid voor machine learning-modeltraining. De pijplijn wordt gebouwd met behulp van de volgende Azure-services:

  • Azure Data Factory: leest de onbewerkte gegevens en organiseert gegevensvoorbereiding.
  • Azure Databricks: voert een Python-notebook uit waarmee de gegevens worden getransformeerd.
  • Azure Pipelines: automatiseert een continue integratie en ontwikkelingsproces.

Pijplijnwerkstroom voor gegevensopname

De pijplijn voor gegevensopname implementeert de volgende werkstroom:

  1. Onbewerkte gegevens worden ingelezen in een ADF-pijplijn (Azure Data Factory).
  2. De ADF-pijplijn verzendt de gegevens naar een Azure Databricks-cluster, waarmee een Python-notebook wordt uitgevoerd om de gegevens te transformeren.
  3. De gegevens worden opgeslagen in een blobcontainer, waar deze kunnen worden gebruikt door Azure Machine Learning om een model te trainen.

data ingestion pipeline workflow

Overzicht van continue integratie en levering

Net als bij veel softwareoplossingen werkt er een team (bijvoorbeeld Data-engineer s) aan. Ze werken samen en delen dezelfde Azure-resources, zoals Azure Data Factory, Azure Databricks en Azure Storage-accounts. De verzameling van deze resources is een ontwikkelomgeving. De data engineers dragen bij aan dezelfde broncodebasis.

Een systeem voor continue integratie en levering automatiseert het proces van het bouwen, testen en leveren (implementeren) van de oplossing. Het CI-proces (Continuous Integration) voert de volgende taken uit:

  • Hiermee wordt de code samengesteld
  • Controleert het met de codekwaliteitstests
  • Eenheidstests uitvoeren
  • Produceert artefacten zoals geteste code en Azure Resource Manager-sjablonen

Het CD-proces (Continuous Delivery) implementeert de artefacten in de downstreamomgevingen.

cicd data ingestion diagram

In dit artikel wordt beschreven hoe u de CI- en CD-processen automatiseert met Azure Pipelines.

Beheer van broncodebeheer

Broncodebeheer is nodig om wijzigingen bij te houden en samenwerking tussen teamleden mogelijk te maken. De code wordt bijvoorbeeld opgeslagen in een Azure DevOps-, GitHub- of GitLab-opslagplaats. De werkstroom voor samenwerking is gebaseerd op een vertakkingsmodel.

Broncode python-notebook

De data engineers werken met de broncode van het Python-notebook lokaal in een IDE (bijvoorbeeld Visual Studio Code) of rechtstreeks in de Databricks-werkruimte. Zodra de codewijzigingen zijn voltooid, worden ze samengevoegd met de opslagplaats volgens een vertakkingsbeleid.

Tip

We raden u aan de code op te slaan in .py bestanden in plaats van in .ipynb Jupyter Notebook-indeling. Het verbetert de leesbaarheid van code en maakt automatische controles van codekwaliteit in het CI-proces mogelijk.

Broncode van Azure Data Factory

De broncode van Azure Data Factory-pijplijnen is een verzameling JSON-bestanden die worden gegenereerd door een Azure Data Factory-werkruimte. Normaal gesproken werken de data engineers met een visuele ontwerper in de Azure Data Factory-werkruimte in plaats van rechtstreeks met de broncodebestanden.

Zie Author met Azure Repos Git-integratie om de werkruimte te configureren voor het gebruik van een opslagplaats voor broncodebeheer.

Continue integratie (CI)

Het uiteindelijke doel van het proces voor continue integratie is het verzamelen van het gezamenlijke teamwerk van de broncode en het voorbereiden voor de implementatie in de downstreamomgevingen. Net als bij broncodebeheer verschilt dit proces voor de Python-notebooks en Azure Data Factory-pijplijnen.

Python Notebook CI

Het CI-proces voor de Python Notebooks haalt de code op uit de samenwerkingsvertakking (bijvoorbeeld master of ontwikkelen) en voert de volgende activiteiten uit:

  • Codelining
  • Het testen van modules
  • De code opslaan als artefact

In het volgende codefragment ziet u de implementatie van deze stappen in een Azure DevOps yaml-pijplijn :

steps:
- script: |
   flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
  workingDirectory: '$(Build.SourcesDirectory)'
  displayName: 'Run flake8 (code style analysis)'  
  
- script: |
   python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
  displayName: 'Run unit tests'

- task: PublishTestResults@2
  condition: succeededOrFailed()
  inputs:
    testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
    testRunTitle: 'Linting & Unit tests'
    failTaskOnFailedTests: true
  displayName: 'Publish linting and unit test results'

- publish: $(Build.SourcesDirectory)
    artifact: di-notebooks

De pijplijn maakt gebruik van flake8 om python-codeplining uit te voeren. Hiermee worden de eenheidstests uitgevoerd die zijn gedefinieerd in de broncode en worden de linting- en testresultaten gepubliceerd, zodat ze beschikbaar zijn in het uitvoerscherm van Azure Pipelines.

Als het testen van linting en eenheid is geslaagd, kopieert de pijplijn de broncode naar de artefactopslagplaats die moet worden gebruikt door de volgende implementatiestappen.

Azure Data Factory CI

CI-proces voor een Azure Data Factory-pijplijn is een knelpunt voor een pijplijn voor gegevensopname. Er is geen continue integratie. Een implementeerbaar artefact voor Azure Data Factory is een verzameling Azure Resource Manager-sjablonen. De enige manier om deze sjablonen te produceren, is door te klikken op de knop Publiceren in de Azure Data Factory-werkruimte.

  1. De data engineers voegen de broncode uit hun functievertakkingen samen in de samenwerkingsvertakking, bijvoorbeeld master of ontwikkelen.
  2. Iemand met de verleende machtigingen klikt op de knop Publiceren om Azure Resource Manager-sjablonen te genereren op basis van de broncode in de samenwerkingsbranch.
  3. De werkruimte valideert de pijplijnen (zoals linting en eenheidstests), genereert Azure Resource Manager-sjablonen (denk eraan vanaf het bouwen) en slaat de gegenereerde sjablonen op in een technische vertakking adf_publish in dezelfde codeopslagplaats (denk eraan als het publiceren van artefacten). Deze vertakking wordt automatisch gemaakt door de Azure Data Factory-werkruimte.

Zie Continue integratie en levering in Azure Data Factory voor meer informatie over dit proces.

Het is belangrijk om ervoor te zorgen dat de gegenereerde Azure Resource Manager-sjablonen omgevingsneutraal zijn. Dit betekent dat alle waarden die tussen omgevingen kunnen verschillen, worden geparametriseerd. Azure Data Factory is slim genoeg om het merendeel van de waarden zoals parameters beschikbaar te maken. In de volgende sjabloon worden de verbindingseigenschappen voor een Azure Machine Learning-werkruimte bijvoorbeeld weergegeven als parameters:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        "AzureMLService_servicePrincipalKey": {
            "value": ""
        },
        "AzureMLService_properties_typeProperties_subscriptionId": {
            "value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
        },
        "AzureMLService_properties_typeProperties_resourceGroupName": {
            "value": "devops-ds-rg"
        },
        "AzureMLService_properties_typeProperties_servicePrincipalId": {
            "value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
        },
        "AzureMLService_properties_typeProperties_tenant": {
            "value": "72f988bf-86f1-41af-912b-2d7cd611db47"
        }
    }
}

Het is echter mogelijk dat u uw aangepaste eigenschappen beschikbaar wilt maken die niet standaard worden verwerkt door de Azure Data Factory-werkruimte. In het scenario van dit artikel roept een Azure Data Factory-pijplijn een Python-notebook aan die de gegevens verwerkt. Het notebook accepteert een parameter met de naam van een invoergegevensbestand.

import pandas as pd
import numpy as np

data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)

labels = np.array(data['target'])
...

Deze naam verschilt voor Dev-, QA-, UAT- en PROD-omgevingen. In een complexe pijplijn met meerdere activiteiten kunnen er verschillende aangepaste eigenschappen zijn. Het is raadzaam om al deze waarden op één plaats te verzamelen en deze als pijplijnvariabelen te definiëren:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Variables tab selected below with the option to add new variables, each with a name, type, and default value.

De pijplijnactiviteiten kunnen verwijzen naar de pijplijnvariabelen terwijl ze daadwerkelijk worden gebruikt:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Settings tab selected below.

In de Azure Data Factory-werkruimte worden pijplijnvariabelen niet standaard weergegeven als parameters voor Azure Resource Manager-sjablonen. In de werkruimte wordt de standaardparameterisatiesjabloon gebruikt om te dicteren welke pijplijneigenschappen moeten worden weergegeven als azure Resource Manager-sjabloonparameters. Als u pijplijnvariabelen wilt toevoegen aan de lijst, werkt u de "Microsoft.DataFactory/factories/pipelines" sectie van de standaardparametersjabloon bij met het volgende fragment en plaatst u het JSON-resultaatbestand in de hoofdmap van de bronmap:

"Microsoft.DataFactory/factories/pipelines": {
        "properties": {
            "variables": {
                "*": {
                    "defaultValue": "="
                }
            }
        }
    }

Als u dit doet, wordt de Azure Data Factory-werkruimte gedwongen om de variabelen toe te voegen aan de lijst met parameters wanneer op de knop Publiceren wordt geklikt:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        ...
        "data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
            "value": "driver_prediction_train.csv"
        }        
    }
}

De waarden in het JSON-bestand zijn standaardwaarden die zijn geconfigureerd in de pijplijndefinitie. Ze worden naar verwachting overschreven met de doelomgevingswaarden wanneer de Azure Resource Manager-sjabloon wordt geïmplementeerd.

Continue levering (CD)

Het proces voor continue levering neemt de artefacten en implementeert deze in de eerste doelomgeving. Het zorgt ervoor dat de oplossing werkt door tests uit te voeren. Als dit lukt, gaat deze verder met de volgende omgeving.

De CD Azure Pipelines bestaat uit meerdere fasen die de omgevingen vertegenwoordigen. Elke fase bevat implementaties en taken die de volgende stappen uitvoeren:

  • Een Python Notebook implementeren in een Azure Databricks-werkruimte
  • Een Azure Data Factory-pijplijn implementeren
  • De pijplijn uitvoeren
  • Het resultaat van de gegevensopname controleren

De pijplijnfasen kunnen worden geconfigureerd met goedkeuringen en poorten die extra controle bieden over de ontwikkeling van het implementatieproces via de keten van omgevingen.

Een Python Notebook implementeren

Het volgende codefragment definieert een Azure Pipeline-implementatie waarmee een Python-notebook wordt gekopieerd naar een Databricks-cluster:

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'       

De artefacten die door de CI worden geproduceerd, worden automatisch gekopieerd naar de implementatieagent en zijn beschikbaar in de $(Pipeline.Workspace) map. In dit geval verwijst de implementatietaak naar het di-notebooks artefact met het Python-notebook. Deze implementatie maakt gebruik van de Databricks Azure DevOps-extensie om de notebookbestanden naar de Databricks-werkruimte te kopiëren.

De Deploy_to_QA fase bevat een verwijzing naar de devops-ds-qa-vg variabelegroep die is gedefinieerd in het Azure DevOps-project. De stappen in deze fase verwijzen naar de variabelen uit deze variabelegroep (bijvoorbeeld $(DATABRICKS_URL) en $(DATABRICKS_TOKEN)). Het idee is dat de volgende fase (bijvoorbeeld Deploy_to_UAT) werkt met dezelfde variabelenamen die zijn gedefinieerd in een eigen UAT-variabelegroep.

Een Azure Data Factory-pijplijn implementeren

Een implementeerbaar artefact voor Azure Data Factory is een Azure Resource Manager-sjabloon. Deze wordt geïmplementeerd met de azure-resourcegroepimplementatietaak , zoals wordt gedemonstreerd in het volgende fragment:

  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"

De waarde van de parameter voor de gegevensbestandsnaam is afkomstig van de $(DATA_FILE_NAME) variabele die is gedefinieerd in een groep met qa-fasevariabelen. Op dezelfde manier kunnen alle parameters die zijn gedefinieerd in ARMTemplateForFactory.json worden overschreven. Als dat niet het is, worden de standaardwaarden gebruikt.

Voer de pijplijn uit en controleer het gegevensopnameresultaat

De volgende stap is om ervoor te zorgen dat de geïmplementeerde oplossing werkt. De volgende taakdefinitie voert een Azure Data Factory-pijplijn uit met een PowerShell-script en voert een Python-notebook uit op een Azure Databricks-cluster. Het notebook controleert of de gegevens correct zijn opgenomen en valideert het resultaatgegevensbestand met $(bin_FILE_NAME) de naam.

  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'

De laatste taak in de taak controleert het resultaat van de uitvoering van het notebook. Als er een fout wordt geretourneerd, wordt de status van de uitvoering van de pijplijn ingesteld op mislukt.

Stukken samenvoegen

De volledige CI/CD Azure Pipeline bestaat uit de volgende fasen:

  • CI
  • Implementeren naar QA
    • Implementeren in Databricks + Implementeren in ADF
    • Integratietest

Het bevat een aantal implementatiefasen die gelijk zijn aan het aantal doelomgevingen dat u hebt. Elke implementatiefase bevat twee implementaties die parallel worden uitgevoerd en een taak die wordt uitgevoerd na implementaties om de oplossing in de omgeving te testen.

Een voorbeeld-implementatie van de pijplijn wordt samengesteld in het volgende yaml-fragment:

variables:
- group: devops-ds-vg

stages:
- stage: 'CI'
  displayName: 'CI'
  jobs:
  - job: "CI_Job"
    displayName: "CI Job"
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'
    - script: pip install --upgrade flake8 flake8_formatter_junit_xml
      displayName: 'Install flake8'
    - checkout: self
    - script: |
       flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
    workingDirectory: '$(Build.SourcesDirectory)'
    displayName: 'Run flake8 (code style analysis)'  
    - script: |
       python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
    displayName: 'Run unit tests'
    - task: PublishTestResults@2
    condition: succeededOrFailed()
    inputs:
        testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
        testRunTitle: 'Linting & Unit tests'
        failTaskOnFailedTests: true
    displayName: 'Publish linting and unit test results'    

    # The CI stage produces two artifacts (notebooks and ADF pipelines).
    # The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
      artifact: di-notebooks
    - checkout: git://${{variables['System.TeamProject']}}@adf_publish    
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
      artifact: adf-pipelines

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'             
  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'                

Volgende stappen