DevOps para um pipeline de ingestão de dados

Na maioria dos cenários, uma solução de ingestão de dados é uma composição de scripts, invocações de serviços e um pipeline orquestrando todas as atividades. Neste artigo, aprende-se a aplicar práticas de DevOps ao ciclo de vida de desenvolvimento de um oleoduto comum de ingestão de dados que prepara dados para a formação de modelos de aprendizagem automática. O gasoduto é construído utilizando os seguintes serviços Azure:

  • Azure Data Factory: Lê os dados brutos e orquestra a preparação de dados.
  • Azure Databricks: Executa um caderno Python que transforma os dados.
  • Gasodutos Azure: Automatiza um processo de integração e desenvolvimento contínuo.

Fluxo de trabalho do gasoduto de ingestão de dados

O gasoduto de ingestão de dados implementa o seguinte fluxo de trabalho:

  1. Os dados brutos são lidos num gasoduto Azure Data Factory (ADF).
  2. O oleoduto ADF envia os dados para um cluster Azure Databricks, que executa um caderno Python para transformar os dados.
  3. Os dados são armazenados num recipiente blob, onde podem ser utilizados pela Azure Machine Learning para treinar um modelo.

fluxo de trabalho do gasoduto de ingestão de dados

Visão geral de integração e entrega contínua

Como em muitas soluções de software, há uma equipa (por exemplo, Engenheiros de Dados) a trabalhar nele. Colaboram e partilham os mesmos recursos Azure, tais como Azure Data Factory, Azure Databricks e Azure Storage. A recolha destes recursos é um ambiente de Desenvolvimento. Os engenheiros de dados contribuem para a mesma base de código fonte.

Um sistema de integração e entrega contínuo automatiza o processo de construção, teste e entrega (implantação) da solução. O processo de Integração Contínua (CI) executa as seguintes tarefas:

  • Monta o código
  • Verifica-o com os testes de qualidade do código
  • Executa testes de unidade
  • Produz artefactos como código testado e modelos de Resource Manager Azure

O processo de Entrega Contínua (CD) implanta os artefactos para os ambientes a jusante.

diagrama de ingestão de dados cíd

Este artigo demonstra como automatizar os processos de CI e CD com a Azure Pipelines.

Gestão do controlo de fontes

A gestão do controlo de fontes é necessária para acompanhar as mudanças e permitir a colaboração entre os membros da equipa. Por exemplo, o código seria armazenado num repositório Azure DevOps, GitHub ou GitLab. O fluxo de trabalho de colaboração baseia-se num modelo de ramificação. Por exemplo, GitFlow.

Código fonte do caderno python

Os engenheiros de dados trabalham com o código-fonte do portátil Python localmente num IDE (por exemplo, Código do Estúdio Visual) ou diretamente no espaço de trabalho databricks. Uma vez que o código é concluído, eles são fundidos ao repositório seguindo uma política de ramificação.

Dica

Recomendamos guardar o código em .py ficheiros e não em .ipynb formato Jupyter Notebook. Melhora a legibilidade do código e permite controlos automáticos de qualidade de código no processo de CI.

Código fonte Azure Data Factory

O código fonte dos oleodutos Azure Data Factory é uma coleção de ficheiros JSON gerados por um espaço de trabalho Azure Data Factory. Normalmente, os engenheiros de dados trabalham com um designer visual no espaço de trabalho Azure Data Factory e não com os ficheiros de código de origem diretamente.

Para configurar o espaço de trabalho para utilizar um repositório de controlo de origem, consulte Autor com integração do Azure Repos Git.

Integração contínua (CI)

O objetivo final do processo de Integração Contínua é reunir o trabalho de equipa conjunta a partir do código fonte e prepará-lo para a implantação para os ambientes a jusante. Tal como acontece com a gestão do código fonte, este processo é diferente para os cadernos Python e Azure Data Factory oleodutos.

Píton Notebook CI

O processo de CI para os Cadernos Python obtém o código do ramo de colaboração (por exemplo, master ou desenvolvimento) e realiza as seguintes atividades:

  • Linça de código
  • Teste de unidades
  • Guardar o código como um artefacto

O seguinte corte de código demonstra a implementação destes passos num gasoduto Azure DevOps yaml :

steps:
- script: |
   flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
  workingDirectory: '$(Build.SourcesDirectory)'
  displayName: 'Run flake8 (code style analysis)'  
  
- script: |
   python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
  displayName: 'Run unit tests'

- task: PublishTestResults@2
  condition: succeededOrFailed()
  inputs:
    testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
    testRunTitle: 'Linting & Unit tests'
    failTaskOnFailedTests: true
  displayName: 'Publish linting and unit test results'

- publish: $(Build.SourcesDirectory)
    artifact: di-notebooks

O oleoduto usa flocos8 para fazer o código Python. Executa os testes de unidade definidos no código-fonte e publica os resultados de linagem e teste para que estejam disponíveis no ecrã de execução dos Gasodutos Azure.

Se o ensaio de linça e unidade for bem sucedido, o gasoduto copiará o código de origem para o repositório de artefactos a utilizar pelas etapas de implantação subsequentes.

Azure Data Factory CI

O processo de CI para um gasoduto Azure Data Factory é um estrangulamento para um oleoduto de ingestão de dados. Não há integração contínua. Um artefacto implantável para Azure Data Factory é uma coleção de modelos de Resource Manager Azure. A única maneira de produzir esses modelos é clicar no botão de publicação no espaço de trabalho Azure Data Factory.

  1. Os engenheiros de dados fundem o código fonte dos seus ramos de recurso no ramo de colaboração, por exemplo, dominar ou desenvolver.
  2. Alguém com as permissões concedidas clica no botão de publicação para gerar modelos de Resource Manager Azure a partir do código fonte no ramo de colaboração.
  3. O espaço de trabalho valida os oleodutos (pense-se nele como um lintamento e teste de unidade), gera Azure Resource Manager modelos (pense nisso como um edifício) e guarda os modelos gerados para um ramo técnico adf_publish no mesmo repositório de código (pense nele como artefactos de publicação). Este ramo é criado automaticamente pelo espaço de trabalho Azure Data Factory.

Para obter mais informações sobre este processo, consulte integração contínua e entrega em Azure Data Factory.

É importante ter certeza de que os modelos gerados de Resource Manager Azure são agnósticos do ambiente. Isto significa que todos os valores que podem diferir entre ambientes são parametrizados. Azure Data Factory é inteligente o suficiente para expor a maioria de valores como parâmetros. Por exemplo, no modelo seguinte, as propriedades de ligação a um espaço de trabalho Azure Machine Learning são expostas como parâmetros:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        "AzureMLService_servicePrincipalKey": {
            "value": ""
        },
        "AzureMLService_properties_typeProperties_subscriptionId": {
            "value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
        },
        "AzureMLService_properties_typeProperties_resourceGroupName": {
            "value": "devops-ds-rg"
        },
        "AzureMLService_properties_typeProperties_servicePrincipalId": {
            "value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
        },
        "AzureMLService_properties_typeProperties_tenant": {
            "value": "72f988bf-86f1-41af-912b-2d7cd611db47"
        }
    }
}

No entanto, pode querer expor as suas propriedades personalizadas que não são manuseadas pelo espaço de trabalho Azure Data Factory por padrão. No cenário deste artigo, um Azure Data Factory oleoduto invoca um portátil Python que processa os dados. O caderno aceita um parâmetro com o nome de um ficheiro de dados de entrada.

import pandas as pd
import numpy as np

data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)

labels = np.array(data['target'])
...

Este nome é diferente para ambientes Dev, QA, UAT e PROD . Em um oleoduto complexo com múltiplas atividades, pode haver várias propriedades personalizadas. É uma boa prática recolher todos esses valores num só local e defini-los como variáveis de gasoduto:

O Screenshot mostra um Bloco de Notas chamado PrepareData e M L Execute Pipeline chamado M L Execute Pipeline no topo com o separador Variáveis selecionado abaixo com a opção de adicionar novas variáveis, cada uma com um nome, tipo e valor predefinido.

As atividades do gasoduto podem referir-se às variáveis do gasoduto utilizando-as:

O Screenshot mostra um Portátil chamado PrepareData e M L Execute Pipeline chamado M L Execute Pipeline na parte superior com o separador Definições selecionado abaixo.

O espaço de trabalho Azure Data Factory não expõe variáveis de gasodutos como Azure Resource Manager modelos por padrão. O espaço de trabalho utiliza o Modelo de Parametização Padrão ditando quais as propriedades do gasoduto que devem ser expostas como parâmetros Resource Manager Azure. Para adicionar variáveis de pipeline à lista, atualize a "Microsoft.DataFactory/factories/pipelines" secção do Modelo de Parâmetroção Padrão com o seguinte corte e coloque o ficheiro de resultado json na raiz da pasta de origem:

"Microsoft.DataFactory/factories/pipelines": {
        "properties": {
            "variables": {
                "*": {
                    "defaultValue": "="
                }
            }
        }
    }

Ao fazê-lo, o espaço de trabalho Azure Data Factory irá adicionar as variáveis à lista de parâmetros quando o botão de publicação é clicado:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        ...
        "data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
            "value": "driver_prediction_train.csv"
        }        
    }
}

Os valores no ficheiro JSON são valores predefinidos configurados na definição do pipeline. Espera-se que sejam ultrapassados com os valores do ambiente-alvo quando o modelo Azure Resource Manager for implantado.

Entrega contínua (CD)

O processo de Entrega Contínua leva os artefactos e implanta-os para o primeiro ambiente alvo. Garante que a solução funciona através de testes realizados. Se for bem sucedido, continua para o próximo ambiente.

Os Pipelines CD Azure consistem em múltiplas etapas que representam os ambientes. Cada fase contém implantações e trabalhos que executam os seguintes passos:

  • Implementar um caderno python para o espaço de trabalho Azure Databricks
  • Implementar um gasoduto Azure Data Factory
  • Executar o pipeline
  • Verifique o resultado da ingestão de dados

As fases do gasoduto podem ser configuradas com aprovações e portões que proporcionam um controlo adicional sobre como o processo de implantação evolui através da cadeia de ambientes.

Implementar um caderno python

O seguinte corte de código define uma implantação do Gasoduto Azure que copia um caderno Python para um cluster Databricks:

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'       

Os artefactos produzidos pelo CI são automaticamente copiados para o agente de implantação e estão disponíveis na $(Pipeline.Workspace) pasta. Neste caso, a tarefa de implantação refere-se ao di-notebooks artefacto que contém o caderno Python. Esta implementação utiliza a extensão Databricks Azure DevOps para copiar os ficheiros do portátil para o espaço de trabalho databricks.

O Deploy_to_QA palco contém uma referência ao devops-ds-qa-vg grupo variável definido no projeto Azure DevOps. Os passos nesta fase referem-se às variáveis deste grupo variável (por exemplo, $(DATABRICKS_URL) e $(DATABRICKS_TOKEN)). A ideia é que a próxima fase (por exemplo, Deploy_to_UAT) funcione com os mesmos nomes variáveis definidos no seu próprio grupo variável de aplicação UAT.

Implementar um gasoduto Azure Data Factory

Um artefacto implantável para Azure Data Factory é um modelo de Resource Manager Azure. Será implantado com a tarefa de implantação do Grupo de Recursos Azure , tal como é demonstrado no seguinte corte:

  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"

O valor do parâmetro data filename provém da $(DATA_FILE_NAME) variável definida num grupo variável de fase QA. Da mesma forma, todos os parâmetros definidos em ARMTemplateForFactory.json podem ser ultrapassados. Se não forem, os valores predefinidos são utilizados.

Executar o oleoduto e verificar o resultado da ingestão de dados

O próximo passo é garantir que a solução implementada está a funcionar. A seguinte definição de trabalho executa um oleoduto Azure Data Factory com um script PowerShell e executa um caderno Python num cluster Azure Databricks. O caderno verifica se os dados foram ingeridos corretamente e valida o ficheiro de dados de resultados com $(bin_FILE_NAME) o nome.

  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'

A tarefa final no trabalho verifica o resultado da execução do caderno. Se der uma posição errada, define o estado da execução do gasoduto para falhar.

Juntando peças

O gasoduto CI/CD Azure completo consiste nas seguintes fases:

  • CI
  • Implementar para QA
    • Implementar para Databricks + Implementar para ADF
    • Teste de Integração

Contém uma série de fases de implantação iguais ao número de ambientes-alvo que tem. Cada fase de implantação contém duas implementações que funcionam em paralelo e um trabalho que funciona após implementações para testar a solução no ambiente.

No seguinte corte yaml é montada uma amostra do gasoduto:

variables:
- group: devops-ds-vg

stages:
- stage: 'CI'
  displayName: 'CI'
  jobs:
  - job: "CI_Job"
    displayName: "CI Job"
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'
    - script: pip install --upgrade flake8 flake8_formatter_junit_xml
      displayName: 'Install flake8'
    - checkout: self
    - script: |
       flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
    workingDirectory: '$(Build.SourcesDirectory)'
    displayName: 'Run flake8 (code style analysis)'  
    - script: |
       python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
    displayName: 'Run unit tests'
    - task: PublishTestResults@2
    condition: succeededOrFailed()
    inputs:
        testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
        testRunTitle: 'Linting & Unit tests'
        failTaskOnFailedTests: true
    displayName: 'Publish linting and unit test results'    

    # The CI stage produces two artifacts (notebooks and ADF pipelines).
    # The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
      artifact: di-notebooks
    - checkout: git://${{variables['System.TeamProject']}}@adf_publish    
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
      artifact: adf-pipelines

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'             
  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'                

Passos seguintes