DevOps untuk alur penyerapan data

Dalam kebanyakan skenario, solusi penyerapan data merupakan gabungan dari skrip, pemanggilan layanan, dan alur yang mengatur semua aktivitas. Dalam artikel ini, Anda akan mempelajari cara menerapkan praktik DevOps ke siklus hidup pengembangan alur penyerapan data umum yang menyiapkan data untuk pelatihan model pembelajaran mesin. Alur ini dibangun menggunakan layanan Azure berikut:

  • Azure Data Factory: Membaca data mentah dan mengatur penyiapan data.
  • Azure Databricks: Menjalankan notebook Python yang mentransformasikan data.
  • Azure Pipelines: Mengotomatiskan proses integrasi dan pengembangan berkelanjutan.

Alur kerja penyerapan data

Alur penyerapan data mengimplementasikan alur kerja berikut:

  1. Data mentah dibaca di alur Azure Data Factory (ADF).
  2. Alur ADF mengirimkan data ke kluster Azure Databricks, yang menjalankan notebook Python untuk mentransformasikan data.
  3. Data disimpan ke kontainer blob. Di sini, data tersebut dapat digunakan oleh Pembelajaran Mesin Microsoft Azure untuk melatih model.

data ingestion pipeline workflow

Ringkasan integrasi dan pengiriman berkelanjutan

Seperti kebanyakan solusi perangkat lunak lainnya, terdapat tim (misalnya, Teknisi Data) yang mengerjakannya. Tim ini berkolaborasi dan berbagi sumber daya Azure yang sama seperti akun Azure Data Factory, Azure Databricks, dan Azure Storage. Pengumpulan sumber daya ini adalah lingkungan Pengembangan. Teknisi data berkontribusi pada basis kode sumber yang sama.

Sistem integrasi dan pengiriman berkelanjutan mengotomatiskan proses pembangunan, pengujian, dan pengiriman (penyebaran) solusi. Proses Integrasi Berkelanjutan (CI) melakukan tugas-tugas berikut:

  • Menggabungkan kode
  • Memeriksanya dengan pengujian kualitas kode
  • Menjalankan pengujian unit
  • Menghasilkan artefak seperti kode yang diuji dan templat Azure Resource Manager

Proses Pengiriman Berkelanjutan (CD) menyebarkan artefak ke lingkungan hilir.

cicd data ingestion diagram

Artikel ini menunjukkan cara mengotomatisasi proses CI dan CD dengan Azure Pipelines.

Manajemen kontrol sumber

Manajemen kontrol sumber diperlukan untuk melacak perubahan dan memungkinkan kolaborasi antar-anggota tim. Misalnya, kode akan disimpan di repositori Azure DevOps, GitHub, atau GitLab. Alur kerja kolaborasi didasarkan pada model percabangan.

Kode Sumber Python Notebook

Teknisi data bekerja dengan kode sumber notebook Python baik secara lokal dalam IDE (misalnya, Visual Studio Code) atau langsung di ruang kerja Databricks. Setelah perubahan kode selesai, kode tersebut digabungkan ke repositori dengan mengikuti kebijakan percabangan.

Tip

Sebaiknya simpan kode dalam file .py, jangan simpan dalam format .ipynb Jupyter Notebook. Hal ini akan meningkatkan keterbacaan kode dan mengaktifkan pemeriksaan kualitas kode otomatis dalam proses CI.

Kode Sumber Azure Data Factory

Kode sumber alur Azure Data Factory adalah kumpulan file JSON yang dihasilkan oleh ruang kerja Azure Data Factory. Biasanya teknisi data bekerja dengan desainer visual di ruang kerja Azure Data Factory, bukan dengan file kode sumber secara langsung.

Guna mengonfigurasi ruang kerja untuk menggunakan repositori kontrol sumber, lihat Menulis dengan integrasi Azure Repos Git.

Integrasi berkelanjutan (CI)

Tujuan akhir dari proses Integrasi Berkelanjutan adalah mengumpulkan kerja tim gabungan dari kode sumber dan mempersiapkannya untuk penyebaran ke lingkungan hilir. Seperti halnya manajemen kode sumber, proses ini berbeda untuk notebook Python dan alur Azure Data Factory.

Python Notebook CI

Proses CI untuk Python Notebooks mendapatkan kode dari cabang kolaborasi (misalnya, master atau develop) dan melakukan aktivitas berikut:

  • Code linting
  • Pengujian Unit
  • Menyimpan kode sebagai artefak

Cuplikan kode berikut menunjukkan implementasi langkah-langkah ini dalam alur yaml Azure DevOps:

steps:
- script: |
   flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
  workingDirectory: '$(Build.SourcesDirectory)'
  displayName: 'Run flake8 (code style analysis)'  
  
- script: |
   python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
  displayName: 'Run unit tests'

- task: PublishTestResults@2
  condition: succeededOrFailed()
  inputs:
    testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
    testRunTitle: 'Linting & Unit tests'
    failTaskOnFailedTests: true
  displayName: 'Publish linting and unit test results'

- publish: $(Build.SourcesDirectory)
    artifact: di-notebooks

Alur ini menggunakan flake8 untuk melakukan code linting Python. Alur ini menjalankan pengujian unit yang ditentukan dalam kode sumber serta menerbitkan hasil linting dan pengujian, sehingga tersedia di layar eksekusi Azure Pipeline.

Jika linting dan pengujian unit berhasil, alur ini akan menyalin kode sumber ke repositori artefak yang akan digunakan oleh langkah-langkah penyebaran berikutnya.

Azure Data Factory CI

Proses CI untuk alur Azure Data Factory adalah hambatan untuk alur penyerapan data. Tidak ada integrasi berkelanjutan. Artefak yang dapat disebarkan untuk Azure Data Factory adalah kumpulan templat Azure Resource Manager. Satu-satunya cara untuk menghasilkan templat tersebut adalah dengan mengklik tombol terbitkan di ruang kerja Azure Data Factory.

  1. Teknisi data menggabungkan kode sumber dari cabang fitur mereka ke dalam cabang kolaborasi, misalnya, master atau develop.
  2. Seseorang yang memiliki izin mengklik tombol terbitkan untuk menghasilkan templat Azure Resource Manager dari kode sumber di cabang kolaborasi.
  3. Ruang kerja memvalidasi alur (anggap saja sebagai pengujian unit dan linting) menghasilkan templat Azure Resource Manager (anggap saja seperti membangun), dan menyimpan templat yang dihasilkan ke cabang teknis adf_publish dalam repositori kode yang sama (anggap saja seperti menerbitkan artefak). Cabang ini dibuat secara otomatis oleh ruang kerja Azure Data Factory.

Untuk informasi selengkapnya tentang proses ini, lihat Integrasi dan pengiriman berkelanjutan di Azure Data Factory.

Sangat penting untuk memastikan bahwa templat Azure Resource Manager yang dihasilkan akan mengabaikan lingkungan. Ini berarti bahwa semua nilai yang mungkin berbeda antara lingkungan akan dijadikan sebagai parameter. Azure Data Factory cukup cerdas untuk mengekspos sebagian besar nilai seperti parameter. Misalnya dalam templat berikut, properti koneksi ke ruang kerja Pembelajaran Mesin Microsoft Azure diekspos sebagai parameter:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        "AzureMLService_servicePrincipalKey": {
            "value": ""
        },
        "AzureMLService_properties_typeProperties_subscriptionId": {
            "value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
        },
        "AzureMLService_properties_typeProperties_resourceGroupName": {
            "value": "devops-ds-rg"
        },
        "AzureMLService_properties_typeProperties_servicePrincipalId": {
            "value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
        },
        "AzureMLService_properties_typeProperties_tenant": {
            "value": "72f988bf-86f1-41af-912b-2d7cd611db47"
        }
    }
}

Namun, sebaiknya Anda mengekspos properti kustom yang tidak ditangani oleh ruang kerja Azure Data Factory secara default. Dalam skenario artikel ini, alur Azure Data Factory memanggil notebook Python yang memproses data. Notebook menerima parameter dengan nama file data input.

import pandas as pd
import numpy as np

data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)

labels = np.array(data['target'])
...

Nama ini berbeda untuk lingkungan Dev, QA, UAT, dan PROD. Dalam alur yang kompleks dengan beberapa aktivitas, mungkin saja terdapat beberapa properti kustom. Mengumpulkan semua nilai tersebut di satu tempat dan menetapkannya sebagai variabel alur adalah praktik terbaik:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Variables tab selected below with the option to add new variables, each with a name, type, and default value.

Aktivitas alur dapat mengacu pada variabel alur sekaligus menggunakannya dalam praktik nyata:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Settings tab selected below.

Ruang kerja Azure Data Factory tidak mengekspos variabel alur sebagai parameter templat Azure Resource Manager secara default. Ruang kerja ini menggunakan Templat Parameterisasi Default yang memberi instruksi terkait apa yang harus diekspos properti alur sebagai parameter templat Azure Resource Manager. Untuk menambahkan variabel alur ke daftar, perbarui bagian "Microsoft.DataFactory/factories/pipelines"Template Parameterisasi Default dengan cuplikan berikut, dan letakkan file json hasil di akar folder sumber:

"Microsoft.DataFactory/factories/pipelines": {
        "properties": {
            "variables": {
                "*": {
                    "defaultValue": "="
                }
            }
        }
    }

Tindakan ini akan memaksa ruang kerja Azure Data Factory untuk menambahkan variabel ke daftar parameter saat tombol terbitkan diklik:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        ...
        "data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
            "value": "driver_prediction_train.csv"
        }        
    }
}

Nilai dalam file JSON adalah nilai default yang dikonfigurasi dalam definisi alur. Nilai ini diharapkan akan diambil alih dengan nilai lingkungan target saat templat Azure Resource Manager disebarkan.

Pengiriman berkelanjutan (CD)

Proses Pengiriman Berkelanjutan membawa artefak dan menyebarkannya ke lingkungan target pertama. Hal ini memastikan bahwa solusi berfungsi dengan menjalankan tes. Jika berhasil, solusi akan berlanjut ke lingkungan berikutnya.

CD Azure Pipeline terdiri dari beberapa tahap yang mewakili lingkungan. Setiap tahap berisi penyebaran dan pekerjaan yang menjalankan langkah-langkah berikut:

  • Menyebarkan Python Notebook ke ruang kerja Azure Databricks
  • Menyebarkan alur Azure Data Factory
  • Menjalankan alur
  • Memeriksa hasil penyerapan data

Tahapan alur dapat dikonfigurasi dengan persetujuan dan gerbang yang memberikan kontrol tambahan terkait bagaimana proses penyebaran berevolusi melalui rantai lingkungan.

Menyebarkan Python Notebook

Cuplikan kode berikut menetapkan penyebaran Azure Pipeline yang menyalin notebook Python ke kluster Databricks:

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'       

Artefak yang dihasilkan oleh CI akan otomatis disalin ke agen penyebaran dan tersedia di folder $(Pipeline.Workspace). Dalam kasus ini, tugas penyebaran mengacu pada artefak di-notebooks yang berisi notebook Python. Penyebaran ini menggunakan ekstensi Databricks Azure DevOps untuk menyalin file notebook ke ruang kerja Databricks.

Tahap Deploy_to_QA berisi referensi ke grup variabel devops-ds-qa-vg yang ditentukan dalam proyek Azure DevOps. Langkah-langkah dalam tahap ini mengacu pada variabel dari grup variabel ini (misalnya, $(DATABRICKS_URL) dan $(DATABRICKS_TOKEN) ). Konsepnya adalah bahwa tahap berikutnya (misalnya, Deploy_to_UAT) akan beroperasi dengan nama variabel yang sama, yang ditentukan dalam grup variabel cakupan UAT tersendiri.

Menyebarkan alur Azure Data Factory

Artefak yang dapat disebarkan untuk Azure Data Factory adalah templat Azure Resource Manager. Templat akan disebarkan dengan tugas Penyebaran Grup Sumber Daya Azure seperti yang ditunjukkan dalam cuplikan berikut:

  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"

Nilai parameter nama file data berasal dari variabel $(DATA_FILE_NAME) yang ditentukan dalam grup variabel tahap QA. Demikian pula, semua parameter yang ditentukan dalam ARMTemplateForFactory.json juga dapat diambil alih. Jika tidak, nilai default akan digunakan.

Menjalankan alur dan memeriksa hasil penyerapan data

Langkah selanjutnya adalah memastikan bahwa solusi yang disebarkan sudah berfungsi. Definisi pekerjaan berikut menjalankan alur Azure Data Factory dengan skrip PowerShell dan menjalankan notebook Python pada kluster Azure Databricks. Notebook ini memeriksa apakah data tersebut telah diserap dengan benar dan memvalidasi file data hasil dengan nama $(bin_FILE_NAME).

  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'

Tugas akhir dalam pekerjaan memeriksa hasil eksekusi notebook. Jika menampilkan kesalahan, tugas ini mengatur status eksekusi alur ke gagal.

Ringkasan

CI/CD Azure Pipeline yang lengkap terdiri dari tahapan berikut:

  • CI
  • Menyebarkan ke QA
    • Menyebarkan ke Databricks + Menyebarkan ke ADF
    • Pengujian Integrasi

Pengujian berisi sejumlah tahapan Sebarkan yang sama dengan jumlah lingkungan target yang Anda miliki. Setiap tahapan Deploy berisi dua penyebaran yang berjalan secara paralel dan pekerjaan yang berjalan setelah penyebaran untuk menguji solusi pada lingkungan.

Sampel implementasi alur dikumpulkan dalam cuplikan yaml berikut:

variables:
- group: devops-ds-vg

stages:
- stage: 'CI'
  displayName: 'CI'
  jobs:
  - job: "CI_Job"
    displayName: "CI Job"
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'
    - script: pip install --upgrade flake8 flake8_formatter_junit_xml
      displayName: 'Install flake8'
    - checkout: self
    - script: |
       flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
    workingDirectory: '$(Build.SourcesDirectory)'
    displayName: 'Run flake8 (code style analysis)'  
    - script: |
       python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
    displayName: 'Run unit tests'
    - task: PublishTestResults@2
    condition: succeededOrFailed()
    inputs:
        testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
        testRunTitle: 'Linting & Unit tests'
        failTaskOnFailedTests: true
    displayName: 'Publish linting and unit test results'    

    # The CI stage produces two artifacts (notebooks and ADF pipelines).
    # The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
      artifact: di-notebooks
    - checkout: git://${{variables['System.TeamProject']}}@adf_publish    
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
      artifact: adf-pipelines

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'             
  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'                

Langkah berikutnya