Ciągła integracja i ciągłe dostarczanie w usłudze Azure Databricks przy użyciu usługi Azure DevOps

Uwaga

W tym artykule opisano usługę Azure DevOps, która nie jest dostarczana ani obsługiwana przez usługę Databricks. Aby skontaktować się z dostawcą, zobacz Azure DevOps Services pomocy technicznej.

Ciągła integracja i ciągłe dostarczanie (CI/CD) odnosi się do procesu tworzenia i dostarczania oprogramowania w krótkich, częstych cyklach przy użyciu potoków automatyzacji. Chociaż nie jest to w żaden sposób nowy proces, który był wszechobecny w tradycyjnej inżynierii oprogramowania od dziesięcioleci, staje się coraz bardziej koniecznym procesem dla zespołów inżynierii danych i nauki o danych. Aby produkty danych były cenne, muszą być dostarczane w odpowiednim czasie. Ponadto konsumenci muszą mieć pewność co do ważności wyników w tych produktach. Automatyzując kompilowanie, testowanie i wdrażanie kodu, zespoły programistyczne mogą dostarczać wydania częściej i niezawodnie niż w przypadku bardziej ręcznych procesów, które są nadal powszechne w wielu zespołach inżynierii danych i nauki o danych.

Ciągła integracja rozpoczyna się od praktyki zatwierdzania kodu z pewną częstotliwością w gałęzi w repozytorium kodu źródłowego. Każde zatwierdzenie jest następnie scalane z zatwierdzeniami innych deweloperów, aby upewnić się, że nie wprowadzono konfliktów. Zmiany są dodatkowo weryfikowane przez utworzenie kompilacji i uruchomienie testów automatycznych dla tej kompilacji. Ten proces ostatecznie powoduje utworzenie artefaktu lub pakietu wdrożenia, który ostatecznie zostanie wdrożony w środowisku docelowym, w tym przypadku w obszarze roboczym usługi Azure Databricks.

Omówienie typowego potoku ciągłej integracji/ciągłego wdrażania usługi Azure Databricks

Chociaż może się to różnić w zależności od potrzeb, typowa konfiguracja potoku usługi Azure Databricks obejmuje następujące kroki:

Ciągła integracja:

  1. Kod
    1. Opracowywanie kodu i testów jednostkowych w notesie usługi Azure Databricks lub przy użyciu zewnętrznego środowiska IDE.
    2. Ręczne uruchamianie testów.
    3. Zatwierdź kod i testy w gałęzi git.
  2. Kompilacja
    1. Zbierz nowy i zaktualizowany kod i testy.
    2. Uruchamianie testów automatycznych.
    3. Kompilowanie bibliotek i kodu platformy Apache Spark spoza notesu.
  3. Wydanie: Wygeneruj artefakt wydania.

Ciągłe dostarczanie:

  1. Wdróż
    1. Wdrażanie notesów.
    2. Wdrażanie bibliotek.
  2. Test: Uruchamianie testów automatycznych i raportów wyników.
  3. Obsługa: Programowe planowanie przepływów pracy inżynierii danych, analizy i uczenia maszynowego.

Opracowywanie i zatwierdzanie kodu

Jednym z pierwszych kroków projektowania potoku ciągłej integracji/ciągłego wdrażania jest podjęcie decyzji o zatwierdzeniu kodu i strategii rozgałęziania w celu zarządzania opracowywaniem i integracją nowego i zaktualizowanego kodu bez negatywnego wpływu na kod aktualnie w środowisku produkcyjnym. Część tej decyzji polega na wybraniu systemu kontroli wersji, który będzie zawierać kod i ułatwi podwyższenie poziomu tego kodu. Usługa Azure Databricks obsługuje integracje z usługami GitHub i Bitbucket, które umożliwiają zatwierdzanie notesów w repozytorium Git.

Jeśli system kontroli wersji nie jest jednym z systemów obsługiwanych za pośrednictwem integracji notesu bezpośredniego lub jeśli potrzebujesz większej elastyczności i kontroli niż samoobsługowa integracja z usługą Git, możesz użyć interfejsu wiersza polecenia usługi Databricks do wyeksportowania notesów i zatwierdzenia ich z komputera lokalnego. Ten skrypt należy uruchomić z poziomu lokalnego repozytorium Git skonfigurowanego do synchronizacji z odpowiednim repozytorium zdalnym. Po wykonaniu ten skrypt powinien:

  1. Wyewidencjonuj żądaną gałąź.
  2. Ściąganie nowych zmian z gałęzi zdalnej.
  3. Eksportowanie notesów z obszaru roboczego usługi Azure Databricks przy użyciu interfejsu wiersza polecenia obszaru roboczego usługi Azure Databricks.
  4. Monituj użytkownika o komunikat zatwierdzenia lub użyj wartości domyślnej, jeśli nie zostanie podana.
  5. Zatwierdź zaktualizowane notesy w gałęzi lokalnej.
  6. Wypchnij zmiany do gałęzi zdalnej.

Poniższy skrypt wykonuje następujące kroki:

git checkout <branch>
git pull
databricks workspace export_dir --profile <profile> -o <path> ./Workspace

dt=`date '+%Y-%m-%d %H:%M:%S'`
msg_default="DB export on $dt"
read -p "Enter the commit comment [$msg_default]: " msg
msg=${msg:-$msg_default}
echo $msg

git add .
git commit -m "<commit-message>"
git push

Jeśli wolisz programować w środowisku IDE, a nie w notesach usługi Azure Databricks, możesz użyć funkcji integracji VCS wbudowanych w nowoczesne środowiska IDE lub interfejs wiersza polecenia usługi Git, aby zatwierdzić kod.

Usługa Azure Databricks udostępnia usługę Databricks Connect — zestaw SDK, który łączy środowiska IDE z klastrami usługi Azure Databricks. Jest to szczególnie przydatne podczas tworzenia bibliotek, ponieważ umożliwia uruchamianie i testowanie jednostkowe kodu w klastrach usługi Azure Databricks bez konieczności wdrażania tego kodu. Zobacz Ograniczenia usługi Databricks Connect, aby określić, czy twój przypadek użycia jest obsługiwany.

Uwaga

Usługa Databricks zaleca teraz używanie bazy danych dbx firmy Databricks Labs do lokalnego programowania zamiast usługi Databricks Connect.

W zależności od strategii rozgałęziania i procesu podwyższania poziomu punkt, w którym potok ciągłej integracji/ciągłego wdrażania zainicjuje kompilację, będzie się różnić. Jednak zatwierdzony kod z różnych współautorów zostanie ostatecznie scalony z wyznaczoną gałęzią do skompilowania i wdrożenia. Kroki zarządzania gałęziami są uruchamiane poza usługą Azure Databricks przy użyciu interfejsów udostępnianych przez system kontroli wersji.

Istnieje wiele narzędzi ciągłej integracji/ciągłego wdrażania, których można użyć do zarządzania potokiem i wykonywania go. W tym artykule pokazano, jak używać serwera automatyzacji usługi Azure DevOps. Ciągła integracja/ciągłe wdrażanie jest wzorcem projektowym, więc kroki i etapy opisane w tym artykule powinny zostać przeniesione z kilkoma zmianami języka definicji potoku w każdym narzędziu. Ponadto większość kodu w tym przykładowym potoku uruchamia standardowy kod języka Python, który można wywołać w innych narzędziach.

Aby uzyskać informacje o korzystaniu z serwera Jenkins z usługą Azure Databricks, zobacz Ciągła integracja i ciągłe dostarczanie w usłudze Azure Databricks przy użyciu narzędzia Jenkins.

Definiowanie potoku kompilacji

Usługa Azure DevOps udostępnia interfejs hostowany w chmurze do definiowania etapów potoku ciągłej integracji/ciągłego wdrażania przy użyciu języka YAML. Potok kompilacji, który uruchamia testy jednostkowe i tworzy artefakt wdrożenia w interfejsie Pipelines. Następnie, aby wdrożyć kod w obszarze roboczym usługi Azure Databricks, należy określić ten artefakt wdrożenia w potoku wydania.

W projekcie usługi Azure DevOps otwórz menu Potoki i kliknij pozycję Potoki.

Menu potoku usługi Azure DevOps

Kliknij przycisk Nowy potok , aby otworzyć edytor potoku, w którym zdefiniujesz kompilację azure-pipelines.yml w pliku.

Selektor gałęzi Git można użyć selektora gałęzi Git , aby dostosować proces kompilacji dla każdej gałęzi w repozytorium Git.

Edytor potoku usługi Azure DevOps

Plik azure-pipelines.yml jest domyślnie przechowywany w katalogu głównym repozytorium git dla potoku. Zmienne środowiskowe, do których odwołuje się potok, są konfigurowane przy użyciu przycisku Zmienne.

Przycisk Zmienne usługi Azure DevOps

Aby uzyskać więcej informacji na temat usługi Azure DevOps i potoków kompilacji, zobacz dokumentację usługi Azure DevOps.

Konfigurowanie agenta kompilacji

Aby wykonać potok, usługa Azure DevOps udostępnia hostowanych w chmurze, na żądanie agentów wykonywania, którzy obsługują wdrożenia na platformie Kubernetes, maszynach wirtualnych, Azure Functions, usłudze Azure Web Apps i wielu innych obiektach docelowych. W tym przykładzie użyjesz agenta na żądanie, aby zautomatyzować wdrażanie kodu w docelowym obszarze roboczym usługi Azure Databricks. Narzędzia lub pakiety wymagane przez potok muszą być zdefiniowane w skrypsie potoku i zainstalowane na agencie w czasie wykonywania.

Ten przykład wymaga następujących zależności:

  • Conda — Conda jest systemem zarządzania środowiskiem open source
  • Python — służy do uruchamiania testów, tworzenia koła wdrażania i wykonywania skryptów wdrażania. Wersja języka Python jest ważna, ponieważ testy wymagają, aby wersja języka Python uruchomiona na agencie odpowiadała wersji klastra usługi Azure Databricks. W tym przykładzie użyto środowiska Databricks Runtime 7.3, który obejmuje język Python 3.7.
  • Biblioteki języka Python: requests, databricks-connect, databricks-cli, pytest

Oto przykładowy potok (azure-pipelines.yml). Poniższy pełny skrypt. W tym artykule opisano poszczególne sekcje skryptu.

# Azure Databricks Build Pipeline
# azure-pipelines.yml

trigger:
- release

pool:
  name: Hosted Ubuntu 1604

steps:
- task: UsePythonVersion@0
  displayName: 'Use Python 3.7'
  inputs:
    versionSpec: 3.7

- script: |
    pip install pytest requests setuptools wheel
    pip install -U databricks-connect==7.3.*
  displayName: 'Load Python Dependencies'

- script: |
    echo "y
    $(WORKSPACE-REGION-URL)
    $(CSE-DEVELOP-PAT)
    $(EXISTING-CLUSTER-ID)
    $(WORKSPACE-ORG-ID)
    15001" | databricks-connect configure
  displayName: 'Configure DBConnect'

- checkout: self
  persistCredentials: true
  clean: true

- script: git checkout release
  displayName: 'Get Latest Branch'

- script: |
    python -m pytest --junit-xml=$(Build.Repository.LocalPath)/logs/TEST-LOCAL.xml
$(Build.Repository.LocalPath)/libraries/python/dbxdemo/test*.py || true

  displayName: 'Run Python Unit Tests for library code'

- task: PublishTestResults@2
  inputs:
    testResultsFiles: '**/TEST-*.xml'
    failTaskOnFailedTests: true
    publishRunAttachments: true

- script: |
    cd $(Build.Repository.LocalPath)/libraries/python/dbxdemo
    python3 setup.py sdist bdist_wheel
    ls dist/
  displayName: 'Build Python Wheel for Libs'

- script: |
    git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' $(Build.BinariesDirectory)

    mkdir -p $(Build.BinariesDirectory)/libraries/python/libs
    cp $(Build.Repository.LocalPath)/libraries/python/dbxdemo/dist/*.* $(Build.BinariesDirectory)/libraries/python/libs

    mkdir -p $(Build.BinariesDirectory)/cicd-scripts
    cp $(Build.Repository.LocalPath)/cicd-scripts/*.* $(Build.BinariesDirectory)/cicd-scripts

  displayName: 'Get Changes'

- task: ArchiveFiles@2
  inputs:
    rootFolderOrFile: '$(Build.BinariesDirectory)'
    includeRootFolder: false
    archiveType: 'zip'
    archiveFile: '$(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip'
    replaceExistingArchive: true

- task: PublishBuildArtifacts@1
  inputs:
    ArtifactName: 'DatabricksBuild'

Konfigurowanie potoku

Na etapie instalacji skonfigurujesz agenta kompilacji, interfejs wiersza polecenia usługi Databricks i usługę Databricks Connect z informacjami o połączeniu.

# Specify the trigger event to start the build pipeline.
# In this case, new code merged into the release branch initiates a new build.
trigger:
- release

# Specify the OS for the agent
pool:
  name: Hosted Ubuntu 1604

# Install Python. The version must match the version on the Databricks cluster.
steps:
- task: UsePythonVersion@0
  displayName: 'Use Python 3.7'
  inputs:
    versionSpec: 3.7

# Install required Python modules, including databricks-connect, required to execute a unit test
# on a cluster.
- script: |
    pip install pytest requests setuptools wheel
    pip install -U databricks-connect==7.3.*
  displayName: 'Load Python Dependencies'

# Use environment variables to pass Databricks login information to the Databricks Connect
# configuration function
- script: |
    echo "y
    $(WORKSPACE-REGION-URL)
    $(CSE-DEVELOP-PAT)
    $(EXISTING-CLUSTER-ID)
    $(WORKSPACE-ORG-ID)
    15001" | databricks-connect configure
  displayName: 'Configure DBConnect'

Pobieranie najnowszych zmian

Ten etap pobiera kod z wyznaczonej gałęzi do agenta wykonawczego agenta.

- checkout: self
  persistCredentials: true
  clean: true

- script: git checkout release
  displayName: 'Get Latest Branch'

Testy jednostkowe w notesach usługi Azure Databricks

W przypadku kodu biblioteki opracowanego poza notesem usługi Azure Databricks proces przypomina tradycyjne praktyki programistyczne. Test jednostkowy jest pisany przy użyciu platformy testowania, takiej jak moduł języka Python pytest , i przechowywania wyników testów przy użyciu plików XML w formacie JUnit.

Kod usługi Azure Databricks to kod platformy Apache Spark przeznaczony do wykonania w klastrach usługi Azure Databricks. Aby przetestować ten kod, możesz użyć zestawu SDK programu Databricks Connect skonfigurowanego w temacie Konfigurowanie potoku.

Testowanie kodu biblioteki przy użyciu usługi Databricks Connect

Ten etap potoku wywołuje testy jednostkowe, określając nazwę i lokalizację zarówno dla testów, jak i plików wyjściowych.

- script: |
    python -m pytest --junit-xml=$(Build.Repository.LocalPath)/logs/TEST-LOCAL.xml $(Build.Repository.LocalPath)/libraries/python/dbxdemo/test*.py || true
    ls logs
  displayName: 'Run Python Unit Tests for library code'

Poniższy fragment kodu (addcol.py) to funkcja biblioteki, która może być zainstalowana w klastrze usługi Azure Databricks. Ta prosta funkcja dodaje nową kolumnę wypełnianą przez literał do ramki danych platformy Apache Spark.

# addcol.py
import pyspark.sql.functions as F

def with_status(df):
    return df.withColumn("status", F.lit("checked"))

Poniższy test test-addcol.py, , przekazuje makietę obiektu DataFrame do with_status funkcji zdefiniowanej w addcol.py. Wynik jest następnie porównywany z obiektem DataFrame zawierającym oczekiwane wartości. Jeśli wartości są zgodne, test przejdzie pomyślnie.

# test-addcol.py
import pytest

from dbxdemo.spark import get_spark
from dbxdemo.appendcol import with_status

class TestAppendCol(object):

    def test_with_status(self):
        source_data = [
            ("pete", "pan", "peter.pan@databricks.com"),
            ("jason", "argonaut", "jason.argonaut@databricks.com")
        ]
        source_df = get_spark().createDataFrame(
            source_data,
            ["first_name", "last_name", "email"]
        )

        actual_df = with_status(source_df)

        expected_data = [
            ("pete", "pan", "peter.pan@databricks.com", "checked"),
            ("jason", "argonaut", "jason.argonaut@databricks.com", "checked")
        ]
        expected_df = get_spark().createDataFrame(
            expected_data,
            ["first_name", "last_name", "email", "status"]
        )

        assert(expected_df.collect() == actual_df.collect())

Kod biblioteki pakietów

Ten etap potoku pakuje kod biblioteki do koła języka Python.

- script: |
    cd $(Build.Repository.LocalPath)/libraries/python/dbxdemo
    python3 setup.py sdist bdist_wheel
    ls dist/
  displayName: 'Build Python Wheel for Libs'

Publikowanie wyników testu

Po wykonaniu wszystkich testów jednostkowych opublikuj wyniki w usłudze Azure DevOps. Umożliwia to wizualizowanie raportów i pulpitów nawigacyjnych związanych ze stanem procesu kompilacji.

- task: PublishTestResults@2
  inputs:
    testResultsFiles: '**/TEST-*.xml'
    failTaskOnFailedTests: true
    publishRunAttachments: true

Generowanie i przechowywanie artefaktu wdrożenia

Ostatnim krokiem potoku kompilacji jest wygenerowanie artefaktu wdrożenia. W tym celu zebrasz cały nowy lub zaktualizowany kod, który ma zostać wdrożony w środowisku usługi Azure Databricks, w tym kod notesu, który ma zostać wdrożony w obszarze roboczym, .whl biblioteki wygenerowane przez proces kompilacji oraz podsumowania wyników dla testów na potrzeby archiwizacji.

# Use git diff to flag files added in the most recent git merge
- script: |
    git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' $(Build.BinariesDirectory)

# Add the wheel file you just created along with utility scripts used by the Release pipeline
# The implementation in your Pipeline may be different.
# The objective is to add all files intended for the current release.
    mkdir -p $(Build.BinariesDirectory)/libraries/python/libs
    cp $(Build.Repository.LocalPath)/libraries/python/dbxdemo/dist/*.* $(Build.BinariesDirectory)/libraries/python/libs

    mkdir -p $(Build.BinariesDirectory)/cicd-scripts
    cp $(Build.Repository.LocalPath)/cicd-scripts/*.* $(Build.BinariesDirectory)/cicd-scripts
  displayName: 'Get Changes'

# Create the deployment artifact and publish it to the artifact repository
- task: ArchiveFiles@2
  inputs:
    rootFolderOrFile: '$(Build.BinariesDirectory)'
    includeRootFolder: false
    archiveType: 'zip'
    archiveFile: '$(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip'
    replaceExistingArchive: true

- task: PublishBuildArtifacts@1
  inputs:
    ArtifactName: 'DatabricksBuild'

Definiowanie potoku wydania

Potok wydania wdraża artefakt w środowisku usługi Azure Databricks. Rozdzielenie potoku wydania z potoku kompilacji umożliwia utworzenie kompilacji bez jej wdrażania lub wdrożenie artefaktów z wielu kompilacji jednocześnie.

  1. W projekcie usługi Azure DevOps przejdź do menu Potoki i kliknij pozycję Wydania.

    Wydania usługi Azure DevOps

  2. Po prawej stronie ekranu znajduje się lista polecanych szablonów dla typowych wzorców wdrażania. W tym potoku kliknij pozycję Puste zadanie.

    Potok wydania usługi Azure DevOps 1

  3. W polu Artefakty po lewej stronie ekranu kliknij pozycję Dodaj i wybierz utworzony wcześniej potok kompilacji.

    Potok wydania usługi Azure DevOps 2

Możesz skonfigurować sposób wyzwalania potoku, klikając ikonę Błyskawica, która wyświetla opcje wyzwalania po prawej stronie ekranu. Jeśli chcesz, aby wydanie było inicjowane automatycznie na podstawie dostępności artefaktu kompilacji lub po przepływie pracy żądania ściągnięcia, włącz odpowiedni wyzwalacz.

Etap potoku wydania usługi Azure DevOps 1

Aby dodać kroki lub zadania dla wdrożenia, kliknij link w obiekcie etapu.

Etap dodawania potoku wydania usługi Azure DevOps

Dodawanie zadań

Aby dodać zadania, kliknij pozycję plus zaloguj się w sekcji Zadanie agenta oznaczone czerwoną strzałką na poniższej ilustracji. Zostanie wyświetlona lista dostępnych zadań z możliwością wyszukiwania. Istnieje również witryna Marketplace dla wtyczek innych firm, których można użyć do uzupełnienia standardowych zadań usługi Azure DevOps.

Dodawanie zadania usługi Azure DevOps

Ustawianie wersji języka Python

Pierwszym dodanym zadaniem jest użycie wersji języka Python. Podobnie jak w przypadku potoku kompilacji, chcesz upewnić się, że wersja języka Python jest zgodna ze skryptami wywoływanymi w kolejnych zadaniach.

Usługa Azure DevOps ustawia język Python w wersji 1

W tym przypadku ustaw wersję języka Python na 3.7.

Usługa Azure DevOps ustawia język Python w wersji 2

Rozpakowywanie artefaktu kompilacji

Wyodrębnij archiwum przy użyciu zadania Wyodrębnij pliki . Ustaw wartość Wzorce *.zipplików archiwum na , a następnie ustaw folder Docelowy na zmienną systemową "$(agent.builddirectory)". Opcjonalnie można ustawić nazwę wyświetlaną; jest to nazwa wyświetlana na ekranie w obszarze Zadanie agenta.

Rozpakowywanie usługi Azure DevOps

Wdrażanie notesów w obszarze roboczym

Aby wdrożyć notesy, w tym przykładzie użyto zadania innej firmy Databricks Deploy Notebooks opracowanego przez usługę Data Thirst.

  • Wprowadź zmienne środowiskowe, aby ustawić wartości dla tokenu elementu nośnego usługi Azure i usługi Databricks.
  • Ustaw ścieżkę pliki źródłowe na ścieżkę wyodrębnionego katalogu zawierającego notesy.
  • Ustaw ścieżkę plików docelowych na żądaną ścieżkę w strukturze katalogu obszaru roboczego usługi Azure Databricks.

Notes wdrażania usługi Azure DevOps

Wdrażanie biblioteki w systemie plików DBFS

Aby wdrożyć plik w języku Python *.war , użyj plików usługi Databricks innej firmy do systemu plików DBFS, opracowanych również przez usługę Data Thirst.

  • Wprowadź zmienne środowiskowe, aby ustawić wartości dla tokenu elementu nośnego usługi Azure i usługi Databricks.
  • Ustaw folder główny lokalny na ścieżkę wyodrębnionego katalogu zawierającego biblioteki języka Python.
  • Ustaw folder Docelowy w systemie plików DBFS na żądaną ścieżkę systemu plików DBFS.

Wdrażanie pliku usługi Azure DevOps

Instalowanie biblioteki w klastrze

Końcowe zadanie wdrażania kodu instaluje bibliotekę w określonym klastrze. W tym celu utworzysz zadanie skryptu języka Python . Skrypt installWhlLibrary.pyjęzyka Python , znajduje się w artefaktie utworzonym przez nasz potok kompilacji.

Uwaga

Najlepszym rozwiązaniem w zakresie zabezpieczeń w przypadku uwierzytelniania za pomocą zautomatyzowanych narzędzi, systemów, skryptów i aplikacji usługa Databricks zaleca używanie tokenów dostępu należących do jednostek usługi zamiast użytkowników obszaru roboczego. Aby uzyskać więcej informacji, zobacz Zarządzanie jednostkami usługi.

  • Ustaw ścieżkę skryptu na $(agent.builddirectory)/cicd-scripts/installWhlLibrary.py. Skrypt installWhlLibrary.py przyjmuje pięć argumentów:
    • shard — adres URL docelowego obszaru roboczego (na przykład https://<region>.azuredatabricks.net)
    • token — osobisty token dostępu dla obszaru roboczego
    • clusterid — identyfikator klastra, na którym ma zostać zainstalowana biblioteka
    • libs — wyodrębniony katalog zawierający biblioteki
    • dbfspath — ścieżka w systemie plików DBFS w celu pobrania bibliotek

Biblioteka instalacji usługi Azure DevOps

Przed zainstalowaniem nowej wersji biblioteki w klastrze usługi Azure Databricks należy odinstalować istniejącą bibliotekę. Aby to zrobić, wywołaj interfejs API REST usługi Databricks w skryscie języka Python, aby wykonać następujące kroki:

  1. Sprawdź, czy biblioteka jest zainstalowana.
  2. Odinstaluj bibliotekę.
  3. Uruchom ponownie klaster, jeśli zostały wykonane jakiekolwiek odinstalowanie.
  4. Przed kontynuowaniem poczekaj na ponowne uruchomienie klastra.
  5. Zainstaluj bibliotekę.
# installWhlLibrary.py
#!/usr/bin/python3
import json
import requests
import sys
import getopt
import time
import os

def main():
    shard = ''
    token = ''
    clusterid = ''
    libspath = ''
    dbfspath = ''

    try:
        opts, args = getopt.getopt(sys.argv[1:], 'hstcld',
                                   ['shard=', 'token=', 'clusterid=', 'libs=', 'dbfspath='])
    except getopt.GetoptError:
        print(
            'installWhlLibrary.py -s <shard> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
        sys.exit(2)

    for opt, arg in opts:
        if opt == '-h':
            print(
                'installWhlLibrary.py -s <shard> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
            sys.exit()
        elif opt in ('-s', '--shard'):
            shard = arg
        elif opt in ('-t', '--token'):
            token = arg
        elif opt in ('-c', '--clusterid'):
            clusterid = arg
        elif opt in ('-l', '--libs'):
            libspath=arg
        elif opt in ('-d', '--dbfspath'):
            dbfspath=arg

    print('-s is ' + shard)
    print('-t is ' + token)
    print('-c is ' + clusterid)
    print('-l is ' + libspath)
    print('-d is ' + dbfspath)

    # Uninstall library if exists on cluster
    i=0

    # Generate array from walking local path
    libslist = []
    for path, subdirs, files in os.walk(libspath):
        for name in files:

            name, file_extension = os.path.splitext(name)
            if file_extension.lower() in ['.whl']:
                libslist.append(name + file_extension.lower())

    for lib in libslist:
        dbfslib = dbfspath + '/' + lib

        if (getLibStatus(shard, token, clusterid, dbfslib) is not None:
            print(dbfslib + ' before:' + getLibStatus(shard, token, clusterid, dbfslib))
            print(dbfslib + " exists. Uninstalling.")
            i = i + 1
            values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}

            resp = requests.post(shard + '/api/2.0/libraries/uninstall', data=json.dumps(values), auth=("token", token))
            runjson = resp.text
            d = json.loads(runjson)
            print(dbfslib + ' after:' + getLibStatus(shard, token, clusterid, dbfslib))

            # Restart if libraries uninstalled
            if i > 0:
                values = {'cluster_id': clusterid}
                print("Restarting cluster:" + clusterid)
                resp = requests.post(shard + '/api/2.0/clusters/restart', data=json.dumps(values), auth=("token", token))
                restartjson = resp.text
                print(restartjson)

                p = 0
                waiting = True
                while waiting:
                    time.sleep(30)
                    clusterresp = requests.get(shard + '/api/2.0/clusters/get?cluster_id=' + clusterid,
                                           auth=("token", token))
                    clusterjson = clusterresp.text
                    jsonout = json.loads(clusterjson)
                    current_state = jsonout['state']
                    print(clusterid + " state:" + current_state)
                    if current_state in ['TERMINATED', 'RUNNING','INTERNAL_ERROR', 'SKIPPED'] or p >= 10:
                        break
                    p = p + 1

        print("Installing " + dbfslib)
        values = {'cluster_id': clusterid, 'libraries': [{'whl': 'dbfs:' + dbfslib}]}

        resp = requests.post(shard + '/api/2.0/libraries/install', data=json.dumps(values), auth=("token", token))
        runjson = resp.text
        d = json.loads(runjson)
        print(dbfslib + ' after:' + getLibStatus(shard, token, clusterid, dbfslib))

def getLibStatus(shard, token, clusterid, dbfslib):

    resp = requests.get(shard + '/api/2.0/libraries/cluster-status?cluster_id='+ clusterid, auth=("token", token))
    libjson = resp.text
    d = json.loads(libjson)
    if (d.get('library_statuses')):
        statuses = d['library_statuses']

        for status in statuses:
            if (status['library'].get('whl')):
                if (status['library']['whl'] == 'dbfs:' + dbfslib):
                    return status['status']
    else:
        # No libraries found
        return "not found"

if __name__ == '__main__':
    main()

Uruchamianie testów integracji z notesu usługi Azure Databricks

Testy można również uruchamiać bezpośrednio z notesów zawierających potwierdzenia. W takim przypadku użyjesz tego samego testu użytego w teście jednostkowym, ale teraz zaimportuje zainstalowaną appendcol bibliotekę z whl właśnie zainstalowanej biblioteki w klastrze.

Aby zautomatyzować ten test i uwzględnić go w potoku ciągłej integracji/ciągłego wdrażania, użyj interfejsu API REST usługi Databricks, aby wykonać notes z serwera ciągłej integracji/ciągłego wdrażania. Dzięki temu można sprawdzić, czy wykonanie notesu zakończyło się pomyślnie, czy nie powiodło się przy użyciu polecenia pytest. Wszystkie błędy asercji są wyświetlane w danych wyjściowych JSON zwróconych przez interfejs API REST i w wynikach testu JUnit.

Krok 1. Konfigurowanie środowiska testowego

Utwórz zadanie wiersza polecenia , jak pokazano. To zadanie obejmuje polecenia, które tworzą katalogi dla dzienników wykonywania notesu i podsumowania testów. Zawiera również polecenie pip, aby zainstalować wymagane pytest moduły i requests .

Konfigurowanie środowiska testowego usługi Azure DevOps

Krok 2. Uruchamianie notesu

Uwaga

Najlepszym rozwiązaniem w zakresie zabezpieczeń w przypadku uwierzytelniania za pomocą zautomatyzowanych narzędzi, systemów, skryptów i aplikacji usługa Databricks zaleca używanie tokenów dostępu należących do jednostek usługi zamiast użytkowników obszaru roboczego. Aby uzyskać więcej informacji, zobacz Zarządzanie jednostkami usługi.

Utwórz zadanie skryptu języka Python i skonfiguruj je w następujący sposób:

  • Ustaw ścieżkę skryptu na $(agent.builddirectory)/cicd-scripts/executeNotebook.py. Ten skrypt przyjmuje sześć argumentów:
    • shard — adres URL docelowego obszaru roboczego (na przykładhttps://eastus.azuredatabricks.net)
    • token — osobisty token dostępu dla obszaru roboczego
    • clusterid — identyfikator klastra, na którym ma zostać wykonany test
    • localpath — wyodrębniony katalog zawierający notesy testowe
    • workspacepath — ścieżka w obszarze roboczym, do którego wdrożono notesy do testowania
    • outfilepath — ścieżka utworzona do przechowywania danych wyjściowych JSON zwróconych przez interfejs API REST

Usługa Azure DevOps wykonuje notesy

Skrypt executenotebook.py uruchamia notes przy użyciu zadań przesyłania punktu końcowego, który przesyła zadanie anonimowe. Ponieważ ten punkt końcowy jest asynchroniczny, używa identyfikatora zadania zwróconego początkowo przez wywołanie REST do sondy stanu zadania. Po zakończeniu zadania dane wyjściowe JSON są zapisywane w ścieżce określonej przez argumenty funkcji przekazane przy wywołaniu.

# executenotebook.py
#!/usr/bin/python3
import json
import requests
import os
import sys
import getopt
import time

def main():
    shard = ''
    token = ''
    clusterid = ''
    localpath = ''
    workspacepath = ''
    outfilepath = ''

    try:
        opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:lwo',
                                   ['shard=', 'token=', 'clusterid=', 'localpath=', 'workspacepath=', 'outfilepath='])
    except getopt.GetoptError:
        print(
            'executenotebook.py -s <shard> -t <token>  -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>)')
        sys.exit(2)

    for opt, arg in opts:
        if opt == '-h':
            print(
                'executenotebook.py -s <shard> -t <token> -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>')
            sys.exit()
        elif opt in ('-s', '--shard'):
            shard = arg
        elif opt in ('-t', '--token'):
            token = arg
        elif opt in ('-c', '--clusterid'):
            clusterid = arg
        elif opt in ('-l', '--localpath'):
            localpath = arg
        elif opt in ('-w', '--workspacepath'):
            workspacepath = arg
        elif opt in ('-o', '--outfilepath'):
            outfilepath = arg

    print('-s is ' + shard)
    print('-t is ' + token)
    print('-c is ' + clusterid)
    print('-l is ' + localpath)
    print('-w is ' + workspacepath)
    print('-o is ' + outfilepath)
    # Generate array from walking local path

    notebooks = []
    for path, subdirs, files in os.walk(localpath):
        for name in files:
            fullpath = path + '/' + name
            # removes localpath to repo but keeps workspace path
            fullworkspacepath = workspacepath + path.replace(localpath, '')

            name, file_extension = os.path.splitext(fullpath)
            if file_extension.lower() in ['.scala', '.sql', '.r', '.py']:
                row = [fullpath, fullworkspacepath, 1]
                notebooks.append(row)

    # run each element in array
    for notebook in notebooks:
        nameonly = os.path.basename(notebook[0])
        workspacepath = notebook[1]

        name, file_extension = os.path.splitext(nameonly)

        # workpath removes extension
        fullworkspacepath = workspacepath + '/' + name

        print('Running job for:' + fullworkspacepath)
        values = {'run_name': name, 'existing_cluster_id': clusterid, 'timeout_seconds': 3600, 'notebook_task': {'notebook_path': fullworkspacepath}}

        resp = requests.post(shard + '/api/2.0/jobs/runs/submit',
                             data=json.dumps(values), auth=("token", token))
        runjson = resp.text
        print("runjson:" + runjson)
        d = json.loads(runjson)
        runid = d['run_id']

        i=0
        waiting = True
        while waiting:
            time.sleep(10)
            jobresp = requests.get(shard + '/api/2.0/jobs/runs/get?run_id='+str(runid),
                             data=json.dumps(values), auth=("token", token))
            jobjson = jobresp.text
            print("jobjson:" + jobjson)
            j = json.loads(jobjson)
            current_state = j['state']['life_cycle_state']
            runid = j['run_id']
            if current_state in ['TERMINATED', 'INTERNAL_ERROR', 'SKIPPED'] or i >= 12:
                break
            i=i+1

        if outfilepath != '':
            file = open(outfilepath + '/' +  str(runid) + '.json', 'w')
            file.write(json.dumps(j))
            file.close()

if __name__ == '__main__':
    main()

Krok 3. Generowanie i ocenianie wyników testu

To zadanie wykonuje skrypt języka Python przy użyciu pytest polecenia w celu określenia, czy potwierdzenia w notesach testowych zostały przekazane lub zakończyły się niepowodzeniem.

Utwórz zadanie wiersza polecenia . Skrypt powinien mieć następującą wartość:

python -m pytest --junit-xml=$(agent.builddirectory)\logs\xml\TEST-notebookout.xml --jsonpath=$(agent.builddirectory)\logs\json\ $(agent.builddirectory)\cicd-scripts\evaluatenotebookruns.py || true

Argumenty to:

  • junit-xml — ścieżka, w której mają być generowane dzienniki podsumowania testu JUnit
  • jsonpath — ścieżka utworzona do przechowywania danych wyjściowych JSON zwróconych przez interfejs API REST

Usługa Azure DevOps generuje wyniki testów

Skrypt evaluatenotebookruns.py definiuje test_job_run funkcję, która analizuje i ocenia kod JSON wygenerowany przez poprzednie zadanie. Inny test, test_performance, szuka testów, które działają dłużej niż oczekiwano.

# evaluatenotebookruns.py
import unittest
import json
import glob
import os

class TestJobOutput(unittest.TestCase):

    test_output_path = '#ENV#'

    def test_performance(self):
        path = self.test_output_path
        statuses = []

        for filename in glob.glob(os.path.join(path, '*.json')):
            print('Evaluating: ' + filename)
            data = json.load(open(filename))
            duration = data['execution_duration']
            if duration > 100000:
                status = 'FAILED'
            else:
                status = 'SUCCESS'

            statuses.append(status)

        self.assertFalse('FAILED' in statuses)

    def test_job_run(self):
        path = self.test_output_path
        statuses = []

        for filename in glob.glob(os.path.join(path, '*.json')):
            print('Evaluating: ' + filename)
            data = json.load(open(filename))
            status = data['state']['result_state']
            statuses.append(status)

        self.assertFalse('FAILED' in statuses)

if __name__ == '__main__':
    unittest.main()

Publikowanie wyników testu

Użyj zadania Publikuj wyniki testu , aby zarchiwizować wyniki JSON i opublikować wyniki testu w usłudze Azure DevOps Test Hub. Umożliwia to wizualizowanie raportów i pulpitów nawigacyjnych związanych ze stanem przebiegów testów.

Publikowanie wyników testów w usłudze Azure DevOps

W tym momencie ukończono cykl integracji i wdrażania przy użyciu potoku ciągłej integracji/ciągłego wdrażania. Automatyzując ten proces, upewnij się, że kod jest testowany i wdrażany przez wydajny, spójny i powtarzalny proces.