Aracılığıyla paylaş


Veri iş akışlarında gereksinim olarak Özel Paket yükleme

Not

Veri iş akışları Apache Airflow tarafından desteklenir.
Apache Airflow, karmaşık veri iş akışlarını program aracılığıyla oluşturmak, zamanlamak ve izlemek için kullanılan açık kaynak bir platformdur. Veri işlem hatlarını temsil etmek için yönlendirilmiş döngüsel grafikler (DAG)'ler halinde birleştirilebilen işleçler olarak adlandırılan bir görev kümesi tanımlamanıza olanak tanır.

Python paketi, ilgili Python modüllerini tek bir dizin hiyerarşisinde düzenlemenin bir yoludur. Paket genellikle init.py adlı özel bir dosya içeren bir dizin olarak temsil edilir. Paket dizininde işlevleri, sınıfları ve değişkenleri tanımlayan birden çok Python modülü dosyanız (.py dosyaları) olabilir. Veri iş akışları bağlamında, özel kodunuzu eklemek için paketler oluşturabilirsiniz.

Bu kılavuz, Veri iş akışlarında Python paketi için ikili dağıtım biçimi olarak görev yapan (Wheel) dosyasını yükleme .whl hakkında adım adım yönergeler sağlar.

Çizim amacıyla, dag dosyasının içinde modül olarak içeri aktarılabilir python paketi olarak basit bir özel işleç oluşturuyorum.

Apache Airflow Dag ile özel operatör geliştirme ve test

  1. Bir dosya sample_operator.py oluşturun ve özel pakete dönüştürün. Kılavuza bakın: Python'da paket oluşturma

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. 1. Adımda tanımlanan işleci test etmek için Apache Airflow DAG dosyasını sample_dag.py oluşturun.

    from datetime import datetime
    from airflow import DAG
    
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. in Dags klasörünü ve özel paket dosyanızı içeren sample_dag.py bir GitHub Deposu oluşturun. Yaygın dosya biçimleri , .whl, veya tar.gz'dırzip. Dosyayı uygun şekilde 'Dags' veya 'Plugins' klasörüne yerleştirin. Git Deponuzu Veri iş akışlarıyla eşitleyin veya önceden yapılandırılmış depo (Install-Private-Package)[https://github.com/ambika-garg/Install-Private-Package-Fabric]

Paketinizi gereksinim olarak ekleme

paketini altına Airflow requirementsgereksinim olarak ekleyin. Biçimi kullanma /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>

Örneğin, özel paketiniz bir GitHub deposunda bulunuyorsa /dags/test/private.whl , gereksinimi /opt/airflow/git/<repoName>.git/dags/test/private.whl Airflow ortamına ekleyin.

Gereksinim olarak eklenen özel paketi gösteren ekran görüntüsü.

Hızlı Başlangıç: Veri iş akışı oluşturma