Udostępnij za pośrednictwem


Instalowanie pakietu prywatnego jako wymagania w przepływach pracy danych

Uwaga

Przepływy pracy danych są obsługiwane przez platformę Apache Airflow.
Apache Airflow to platforma typu open source używana do programowego tworzenia, planowania i monitorowania złożonych przepływów pracy danych. Umożliwia zdefiniowanie zestawu zadań nazywanych operatorami, które można połączyć w skierowane grafy acykliczne (DAG) do reprezentowania potoków danych.

Pakiet języka Python to sposób organizowania powiązanych modułów języka Python w jedną hierarchię katalogów. Pakiet jest zwykle reprezentowany jako katalog zawierający specjalny plik o nazwie init.py. Wewnątrz katalogu pakietów można mieć wiele plików modułów języka Python (.py plików), które definiują funkcje, klasy i zmienne. W kontekście przepływów pracy danych można tworzyć pakiety w celu dodania niestandardowego kodu.

Ten przewodnik zawiera instrukcje krok po kroku dotyczące instalowania .whl pliku (Wheel), który służy jako format dystrybucji binarnej pakietu języka Python w przepływach pracy danych.

Na potrzeby ilustracji utworzyę prosty operator niestandardowy jako pakiet języka Python, który można zaimportować jako moduł wewnątrz pliku dag.

Opracowywanie operatora niestandardowego i testowanie za pomocą narzędzia Apache Airflow Dag

  1. Utwórz plik sample_operator.py i przekonwertuj go na pakiet prywatny. Zapoznaj się z przewodnikiem: Tworzenie pakietu w języku Python

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. Utwórz plik sample_dag.py DAG platformy Apache Airflow, aby przetestować operator zdefiniowany w kroku 1.

    from datetime import datetime
    from airflow import DAG
    
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. Utwórz repozytorium GitHub zawierające sample_dag.py folder w Dags folderze i plik pakietu prywatnego. Typowe formaty plików obejmują zip, .whllub tar.gz. Umieść plik w folderze "Dags" lub "Plugins", odpowiednio. Zsynchronizuj repozytorium Git z przepływami pracy danych lub możesz użyć wstępnie skonfigurowanego repozytorium (Install-Private-Package)[https://github.com/ambika-garg/Install-Private-Package-Fabric]

Dodaj pakiet jako wymaganie

Dodaj pakiet jako wymaganie w obszarze Airflow requirements. Użyj formatu /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>

Jeśli na przykład pakiet prywatny znajduje się /dags/test/private.whl w repozytorium GitHub, dodaj wymaganie /opt/airflow/git/<repoName>.git/dags/test/private.whl do środowiska Airflow.

Zrzut ekranu przedstawiający dodany pakiet prywatny jako wymaganie.

Szybki start: tworzenie przepływu pracy danych