Organizowanie zadań usługi Azure Databricks za pomocą platformy Apache Airflow

W tym artykule opisano obsługę platformy Apache Airflow do organizowania potoków danych za pomocą usługi Azure Databricks, przedstawiono instrukcje dotyczące instalowania i konfigurowania przepływu pracy airflow lokalnie oraz przedstawiono przykład wdrażania i uruchamiania przepływu pracy usługi Azure Databricks za pomocą rozwiązania Airflow.

Orkiestracja zadań w potoku danych

Tworzenie i wdrażanie potoku przetwarzania danych często wymaga zarządzania złożonymi zależnościami między zadaniami. Na przykład potok może odczytywać dane ze źródła, czyścić dane, przekształcać oczyszczone dane i zapisywać przekształcone dane do miejsca docelowego. Potrzebna jest również obsługa testowania, planowania i rozwiązywania problemów z błędami podczas operacjonalizacji potoku.

Systemy przepływów pracy odpowiadają tym wyzwaniom, umożliwiając definiowanie zależności między zadaniami, planowanie uruchamiania potoków i monitorowanie przepływów pracy. Apache Airflow to rozwiązanie typu open source do zarządzania potokami danych i planowania ich. Przepływ powietrza reprezentuje potoki danych jako ukierunkowane grafy acykliczne (DAG) operacji. Definiujesz przepływ pracy w pliku języka Python, a aplikacja Airflow zarządza planowaniem i wykonywaniem. Połączenie airflow Azure Databricks umożliwia korzystanie ze zoptymalizowanego aparatu Spark oferowanego przez usługę Azure Databricks z funkcjami planowania airflow.

Wymagania

  • Integracja platformy Airflow z usługą Azure Databricks wymaga rozwiązania Airflow w wersji 2.5.0 lub nowszej. Przykłady w tym artykule są testowane przy użyciu rozwiązania Airflow w wersji 2.6.1.
  • Przepływ powietrza wymaga języka Python 3.8, 3.9, 3.10 lub 3.11. Przykłady w tym artykule są testowane przy użyciu języka Python 3.8.
  • Instrukcje opisane w tym artykule dotyczące instalowania i uruchamiania platformy Airflow wymagają potoku w celu utworzenia środowiska wirtualnego języka Python.

Operatory przepływu powietrza dla usługi Databricks

Grupa DAG przepływu powietrza składa się z zadań, w których każde zadanie uruchamia operator przepływu powietrza. Operatory przepływu powietrza obsługujące integrację z usługą Databricks są implementowane w dostawcy usługi Databricks.

Dostawca usługi Databricks obejmuje operatory do uruchamiania wielu zadań w obszarze roboczym usługi Azure Databricks, w tym importowania danych do tabeli, uruchamiania zapytań SQL i pracy z folderami Git usługi Databricks.

Dostawca usługi Databricks implementuje dwa operatory do wyzwalania zadań:

Aby utworzyć nowe zadanie usługi Azure Databricks lub zresetować istniejące zadanie, dostawca usługi Databricks implementuje moduł DatabricksCreateJobsOperator. Używa DatabricksCreateJobsOperator żądań INTERFEJSu API POST /api/2.1/jobs/create i POST /api/2.1/jobs/reset API. Możesz użyć DatabricksCreateJobsOperator elementu z elementem , DatabricksRunNowOperator aby utworzyć i uruchomić zadanie.

Uwaga

Wyzwalanie zadania przy użyciu operatorów usługi Databricks wymaga podania poświadczeń w konfiguracji połączenia usługi Databricks. Zobacz Tworzenie osobistego tokenu dostępu usługi Azure Databricks dla rozwiązania Airflow.

Operatory przepływu powietrza usługi Databricks zapisują adres URL strony uruchamiania zadania do dzienników airflow co polling_period_seconds (wartość domyślna to 30 sekund). Aby uzyskać więcej informacji, zobacz stronę pakietu apache-airflow-providers-databricks w witrynie internetowej Airflow.

Lokalne instalowanie integracji rozwiązania Airflow Azure Databricks

Aby zainstalować platformę Airflow i dostawcę usługi Databricks lokalnie na potrzeby testowania i programowania, wykonaj następujące kroki. Inne opcje instalacji systemu Airflow, w tym tworzenie instalacji produkcyjnej, można znaleźć w dokumentacji rozwiązania Airflow.

Otwórz terminal i uruchom następujące polecenia:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Zastąp <firstname>ciąg , <lastname>i <email> nazwą użytkownika i adresem e-mail. Zostanie wyświetlony monit o wprowadzenie hasła dla użytkownika administracyjnego. Pamiętaj, aby zapisać to hasło, ponieważ jest wymagane zalogowanie się do interfejsu użytkownika aplikacji Airflow.

Ten skrypt wykonuje następujące kroki:

  1. Tworzy katalog o nazwie airflow i zmienia się w tym katalogu.
  2. Używa pipenv metody do tworzenia i tworzenia środowiska wirtualnego języka Python. Usługa Databricks zaleca używanie środowiska wirtualnego języka Python do izolowania wersji pakietów i zależności kodu do tego środowiska. Ta izolacja pomaga zmniejszyć nieoczekiwane niezgodności wersji pakietu i kolizje zależności kodu.
  3. Inicjuje zmienną środowiskową o nazwie AIRFLOW_HOME set na ścieżkę airflow katalogu.
  4. Instaluje pakiety dostawcy Airflow i Airflow Databricks.
  5. airflow/dags Tworzy katalog. Funkcja Airflow używa dags katalogu do przechowywania definicji DAG.
  6. Inicjuje bazę danych SQLite używaną przez funkcję Airflow do śledzenia metadanych. W przypadku wdrożenia produkcyjnego systemu Airflow należy skonfigurować przepływ powietrza przy użyciu standardowej bazy danych. Baza danych SQLite i domyślna konfiguracja wdrożenia airflow są inicjowane w airflow katalogu.
  7. Tworzy użytkownika administratora aplikacji Airflow.

Napiwek

Aby potwierdzić instalację dostawcy usługi Databricks, uruchom następujące polecenie w katalogu instalacyjnym Airflow:

airflow providers list

Uruchamianie serwera internetowego airflow i harmonogramu

Aby wyświetlić interfejs użytkownika przepływu powietrza, wymagany jest serwer internetowy Airflow. Aby uruchomić serwer internetowy, otwórz terminal w katalogu instalacyjnym Airflow i uruchom następujące polecenia:

Uwaga

Jeśli nie można uruchomić serwera internetowego Airflow z powodu konfliktu portów, możesz zmienić domyślny port w konfiguracji airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

Harmonogram jest składnikiem Airflow, który planuje grupy DAG. Aby uruchomić harmonogram, otwórz nowy terminal w katalogu instalacyjnym Airflow i uruchom następujące polecenia:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Testowanie instalacji systemu Airflow

Aby sprawdzić instalację rozwiązania Airflow, możesz uruchomić jedną z przykładowych grup DAG dołączonych do rozwiązania Airflow:

  1. W oknie przeglądarki otwórz plik http://localhost:8080/home. Zaloguj się do interfejsu użytkownika aplikacji Airflow przy użyciu nazwy użytkownika i hasła utworzonego podczas instalowania aplikacji Airflow. Zostanie wyświetlona strona Grupy DAG przepływu powietrza.
  2. Kliknij przełącznik Wstrzymaj /Copause daG, aby usunąć jedną z przykładowych grup DAG , na przykład example_python_operator.
  3. Wyzwól przykładowy zestaw DAG, klikając przycisk Trigger DAG (Wyzwalaj grupę DAG ).
  4. Kliknij nazwę DAG, aby wyświetlić szczegóły, w tym stan uruchomienia grupy DAG.

Tworzenie osobistego tokenu dostępu usługi Azure Databricks dla rozwiązania Airflow

Aplikacja Airflow łączy się z usługą Databricks przy użyciu osobistego tokenu dostępu usługi Azure Databricks (PAT). Aby utworzyć osobisty token dostępu:

  1. W obszarze roboczym usługi Azure Databricks kliknij nazwę użytkownika usługi Azure Databricks na górnym pasku, a następnie wybierz pozycję Ustawienia z listy rozwijanej.
  2. Kliknij pozycję Deweloper.
  3. Obok pozycji Tokeny dostępu kliknij pozycję Zarządzaj.
  4. Kliknij pozycję Generuj nowy token.
  5. (Opcjonalnie) Wprowadź komentarz, który pomaga zidentyfikować ten token w przyszłości i zmienić domyślny okres istnienia tokenu na 90 dni. Aby utworzyć token bez okresu istnienia (niezalecane), pozostaw puste pole Okres istnienia (dni) (puste).
  6. Kliknij pozycję Generate (Generuj).
  7. Skopiuj wyświetlony token do bezpiecznej lokalizacji, a następnie kliknij przycisk Gotowe.

Uwaga

Pamiętaj, aby zapisać skopiowany token w bezpiecznej lokalizacji. Nie udostępniaj skopiowanego tokenu innym osobom. W przypadku utraty skopiowanego tokenu nie można wygenerować tego samego tokenu. Zamiast tego należy powtórzyć tę procedurę, aby utworzyć nowy token. Jeśli utracisz skopiowany token lub uważasz, że token został naruszony, usługa Databricks zdecydowanie zaleca natychmiastowe usunięcie tego tokenu z obszaru roboczego, klikając ikonę kosza (Odwołaj) obok tokenu na stronie Tokeny dostępu.

Jeśli nie możesz utworzyć lub użyć tokenów w obszarze roboczym, może to być spowodowane tym, że administrator obszaru roboczego wyłączył tokeny lub nie udzielił Ci uprawnień do tworzenia lub używania tokenów. Zobacz administratora obszaru roboczego lub następujące elementy:

Uwaga

Najlepszym rozwiązaniem w zakresie zabezpieczeń w przypadku uwierzytelniania za pomocą zautomatyzowanych narzędzi, systemów, skryptów i aplikacji usługa Databricks zaleca używanie osobistych tokenów dostępu należących do jednostek usługi zamiast użytkowników obszaru roboczego. Aby utworzyć tokeny dla jednostek usługi, zobacz Zarządzanie tokenami dla jednostki usługi.

Możesz również uwierzytelnić się w usłudze Azure Databricks przy użyciu tokenu Microsoft Entra ID (dawniej Azure Active Directory). Zobacz Połączenie ion usługi Databricks w dokumentacji platformy Airflow.

Konfigurowanie połączenia usługi Azure Databricks

Instalacja rozwiązania Airflow zawiera domyślne połączenie dla usługi Azure Databricks. Aby zaktualizować połączenie w celu nawiązania połączenia z obszarem roboczym przy użyciu osobistego tokenu dostępu utworzonego powyżej:

  1. W oknie przeglądarki otwórz plik http://localhost:8080/connection/list/. Jeśli zostanie wyświetlony monit o zalogowanie się, wprowadź nazwę użytkownika i hasło administratora.
  2. W obszarze Identyfikator conn znajdź databricks_default i kliknij przycisk Edytuj rekord .
  3. Zastąp wartość w polu Host nazwą wystąpienia obszaru roboczego wdrożenia usługi Azure Databricks, na przykład https://adb-123456789.cloud.databricks.com.
  4. W polu Hasło wprowadź osobisty token dostępu usługi Azure Databricks.
  5. Kliknij przycisk Zapisz.

Jeśli używasz tokenu microsoft Entra ID, zobacz Databricks Połączenie ion w dokumentacji aplikacji Airflow, aby uzyskać informacje na temat konfigurowania uwierzytelniania.

Przykład: tworzenie grupy DAG przepływu powietrza w celu uruchomienia zadania usługi Azure Databricks

W poniższym przykładzie pokazano, jak utworzyć proste wdrożenie systemu Airflow uruchamiane na komputerze lokalnym i wdrożyć przykładową grupę DAG w celu wyzwolenia przebiegów w usłudze Azure Databricks. W tym przykładzie wykonasz następujące elementy:

  1. Utwórz nowy notes i dodaj kod, aby wydrukować powitanie na podstawie skonfigurowanego parametru.
  2. Utwórz zadanie usługi Azure Databricks z pojedynczym zadaniem, które uruchamia notes.
  3. Skonfiguruj połączenie airflow z obszarem roboczym usługi Azure Databricks.
  4. Utwórz grupę DAG przepływu powietrza, aby wyzwolić zadanie notesu. Grupę DAG można zdefiniować w skryfcie języka Python przy użyciu polecenia DatabricksRunNowOperator.
  5. Użyj interfejsu użytkownika przepływu powietrza, aby wyzwolić grupę DAG i wyświetlić stan uruchomienia.

Tworzenie notesu

W tym przykładzie użyto notesu zawierającego dwie komórki:

  • Pierwsza komórka zawiera widżet tekstowy Narzędzia usługi Databricks, definiujący zmienną o nazwie greeting ustawioną na wartość worlddomyślną .
  • Druga komórka wyświetla wartość zmiennej poprzedzonej greeting prefiksem hello.

Aby utworzyć notes:

  1. Przejdź do obszaru roboczego usługi Azure Databricks, kliknij pozycję Nowa ikonaNowy na pasku bocznym i wybierz pozycję Notes.

  2. Nadaj notesowi nazwę, taką jak Hello Airflow, i upewnij się, że język domyślny jest ustawiony na python.

  3. Skopiuj następujący kod w języku Python i wklej go w pierwszej komórce notesu.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Dodaj nową komórkę poniżej pierwszej komórki i skopiuj i wklej następujący kod w języku Python do nowej komórki:

    print("hello {}".format(greeting))
    

Tworzenie zadania

  1. Kliknij pozycję Ikona przepływów pracyPrzepływy pracy na pasku bocznym.

  2. Kliknij pozycję Przycisk Utwórz zadanie.

    Na karcie Zadania zostanie wyświetlone okno dialogowe tworzenia zadania.

    Okno dialogowe Tworzenie pierwszego zadania

  3. Zastąp ciąg Dodaj nazwę zadania... nazwą zadania.

  4. W polu Nazwa zadania wprowadź nazwę zadania, na przykład greeting-task.

  5. W menu rozwijanym Typ wybierz pozycję Notes.

  6. W menu rozwijanym Źródło wybierz pozycję Obszar roboczy.

  7. Kliknij pole tekstowe Ścieżka i użyj przeglądarki plików, aby znaleźć utworzony notes, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.

  8. Kliknij pozycję Dodaj w obszarze Parametry. W polu Klucz wprowadź wartość greeting. W polu Wartość wprowadź wartość Airflow user.

  9. Kliknij pozycję Utwórz zadanie.

Na panelu Szczegóły zadania skopiuj wartość Identyfikator zadania. Ta wartość jest wymagana do wyzwolenia zadania z przepływu powietrza.

Uruchamianie zadania

Aby przetestować nowe zadanie w interfejsie użytkownika przepływów pracy usługi Azure Databricks, kliknij Przycisk Uruchom teraz w prawym górnym rogu. Po zakończeniu przebiegu możesz zweryfikować dane wyjściowe, wyświetlając szczegóły uruchomienia zadania.

Tworzenie nowej grupy DAG przepływu powietrza

Zdefiniuj grupę DAG przepływu powietrza w pliku języka Python. Aby utworzyć grupę DAG w celu wyzwolenia przykładowego zadania notesu:

  1. W edytorze tekstów lub środowisku IDE utwórz nowy plik o nazwie o databricks_dag.py następującej zawartości:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Zastąp JOB_ID ciąg wartością zapisanego wcześniej identyfikatora zadania.

  2. Zapisz plik w airflow/dags katalogu. Funkcja Airflow automatycznie odczytuje i instaluje pliki DAG przechowywane w programie airflow/dags/.

Instalowanie i weryfikowanie grupy DAG w rozwiązaniu Airflow

Aby wyzwolić i zweryfikować grupę DAG w interfejsie użytkownika przepływu powietrza:

  1. W oknie przeglądarki otwórz plik http://localhost:8080/home. Zostanie wyświetlony ekran Grupy DAG przepływu powietrza.
  2. Znajdź databricks_dag i kliknij przełącznik Pause/Unpause DAG, aby usunąć grupę DAG .
  3. Wyzwól grupę DAG, klikając przycisk Wyzwalaj grupę DAG .
  4. Kliknij przebieg w kolumnie Uruchomienia , aby wyświetlić stan i szczegóły przebiegu.