Provozování pipelinů ve workflow

Kanál můžete spustit jako součást pracovního postupu zpracování dat pomocí úloh Lakeflow, Apache Airflow nebo Azure Data Factory.

Jobs

V úloze Databricks můžete orchestrovat více úloh za účelem implementace pracovního postupu zpracování dat. Pokud chcete do úlohy zahrnout kanál, při vytváření úlohy použijte úlohu Kanálu . Viz úloha kanálu pro úlohy.

Apache Airflow

Apache Airflow je opensourcové řešení pro správu a plánování pracovních postupů dat. Airflow představuje pracovní postupy jako řízené acyklické grafy (DAG) operací. Pracovní postup definujete v souboru Pythonu a Airflow spravuje plánování a provádění. Informace o instalaci a používání Airflow s Azure Databricks najdete v tématu Orchestrate Lakeflow Jobs with Apache Airflow.

Pokud chcete spustit pipeline jako součást pracovního postupu Airflow, použijte DatabricksSubmitRunOperator.

Požadavky

K použití podpory Airflow pro deklarativní kanály Lakeflow Sparku jsou potřeba následující:

Example

Následující příklad vytvoří Airflow DAG, který aktivuje aktualizaci pipeline s identifikátorem 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('ldp',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Nahraďte CONNECTION_ID identifikátorem připojení Airflow k vašemu pracovnímu prostoru.

Uložte tento příklad do airflow/dags adresáře a pomocí uživatelského rozhraní Airflow zobrazte a aktivujte DAG. Pomocí uživatelského rozhraní pipeline můžete zobrazit podrobnosti o aktualizaci tohoto procesu.

Azure Data Factory

Poznámka:

Mezi deklarativní kanály Sparku Lakeflow a Azure Data Factory patří možnosti konfigurace počtu opakování v případě selhání. Pokud jsou hodnoty opakování nakonfigurované ve vašem kanálu a v aktivitě služby Azure Data Factory, která kanál volá, je počet opakování vynásobenou hodnotou opakování kanálu.

Pokud například aktualizace pipeline selže, Lakeflow Spark Declarative Pipelines ve výchozím nastavení aktualizaci automaticky opakuje až pětkrát. Pokud je opakování služby Azure Data Factory nastavené na tři a váš kanál používá výchozí nastavení pěti opakování, může se váš neúspěšný kanál opakovat až patnáctkrát. Aby nedocházelo k nadměrným pokusům o opakování při selhání aktualizací kanálu, doporučuje Databricks omezit počet opakování při konfiguraci kanálu nebo aktivity služby Azure Data Factory, která kanál volá.

Pokud chcete změnit nastavení opakování pro vaši pipeline, použijte pipelines.numUpdateRetryAttempts při konfiguraci pipeline.

Azure Data Factory je cloudová služba ETL, která umožňuje orchestraci pracovních postupů integrace a transformace dat. Azure Data Factory přímo podporuje spouštění úloh Azure Databricks v pracovním postupu, včetně poznámkových bloků, úloh JAR a skriptů Pythonu. Do pracovního postupu můžete datový tok zahrnout také použitím rozhraní REST API z webové aktivity služby Azure Data Factory. Pokud například chcete spustit aktualizaci pipeline ze služby Azure Data Factory:

  1. Vytvořte datovou továrnu nebo otevřete existující datovou továrnu.

  2. Po dokončení vytváření otevřete stránku datové továrny a klikněte na dlaždici Otevřít Azure Data Factory Studio . Zobrazí se uživatelské rozhraní služby Azure Data Factory.

  3. Nový datový kanál v Azure Data Factory vytvoříte tak, že v uživatelském rozhraní Azure Data Factory Studio vyberete datový kanál z rozevírací nabídky Nový.

  4. Na panelu nástrojů Aktivity rozbalte Obecné a přetáhněte Webovou aktivitu na plátno potrubí. Klikněte na kartu Nastavení a zadejte následující hodnoty:

    Poznámka:

    Jako bezpečnostní opatření při ověřování pomocí automatizovaných nástrojů, systémů, skriptů a aplikací doporučuje Databricks používat osobní přístupové tokeny, které patří služebním subjektům, místo uživatelů pracovního prostoru. Pokud chcete vytvořit tokeny pro instanční objekty, přečtěte si téma Správa tokenů instančního objektu.

    • Adresa URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Nahradit <get-workspace-instance>.

      Nahraďte <pipeline-id> identifikátorem kanálu.

    • Metoda: V rozevírací nabídce vyberte POST .

    • Záhlaví: Klikněte na + Nový. Do textového pole Název zadejte Authorization. Do textového pole Hodnota zadejte Bearer <personal-access-token>.

      Nahraďte <personal-access-token>osobním přístupovým tokenem Azure Databricks.

    • Text: Pokud chcete předat další parametry požadavku, zadejte dokument JSON obsahující parametry. Chcete-li například spustit aktualizaci a znovu zpracovat všechna data pro datový tok: {"full_refresh": "true"}. Pokud neexistují žádné další parametry požadavku, zadejte prázdné složené závorky ({}).

Chcete-li otestovat webovou aktivitu, klepněte na tlačítko Ladit na panelu nástrojů v uživatelském rozhraní služby Data Factory. Výstup a stav spuštění, včetně chyb, se zobrazí na kartě Výstup v Azure Data Factory pipeline. K zobrazení podrobností o aktualizaci kanálu použijte uživatelské rozhraní kanálů.

Návod

Běžným požadavkem pracovního postupu je zahájení úkolu po dokončení předchozího úkolu. Vzhledem k tomu, že požadavek kanálu updates je asynchronní – požadavek se vrátí po spuštění aktualizace, ale před dokončením aktualizace – úlohy v kanálu Azure Data Factory se závislostí na aktualizaci kanálu musí počkat, než se aktualizace dokončí. Možnost počkat na dokončení aktualizace je přidání aktivity Until za webovou aktivitou, která aktivuje aktualizaci deklarativních kanálů Sparku Lakeflow. V aktivitě Until:

  1. Přidejte aktivitu čekání , která po dokončení aktualizace čeká na nakonfigurovaný počet sekund.
  2. Přidejte webovou aktivitu za aktivitou Čekání, která používá požadavek na podrobnosti aktualizace kanálu k získání stavu aktualizace. Pole state v odpovědi vrátí aktuální stav aktualizace, včetně toho, jestli byla dokončena.
  3. Pomocí hodnoty state pole nastavte ukončovací podmínku aktivity Until. Aktivitu Nastavit proměnnou můžete použít také k přidání proměnné kanálu na základě state hodnoty a k použití této proměnné pro podmínku ukončení.