Aracılığıyla paylaş


İş akışında işlem hatlarını çalıştırma

Lakeflow görevleri, Apache Airflow veya Azure Data Factory ile veri işleme iş akışının bir parçası olarak bir işlem hattı çalıştırabilirsiniz.

İşler

Veri işleme iş akışı uygulamak için Databricks işinde birden çok görevi düzenleyebilirsiniz. bir işe işlem hattı eklemek için, bir iş oluştururken İşlem Hattı görevini kullanın. bkz . İşler için işlem hattı görevi.

Apache Airflow

Apache Airflow , veri iş akışlarını yönetmeye ve zamanlamaya yönelik açık kaynak bir çözümdür. Hava akışı, iş akışlarını işlemlerin yönlendirilmiş döngüsel grafikleri (DAG' ler) olarak temsil eder. Bir Python dosyasında iş akışı tanımlarsınız ve zamanlama ve yürütmeyi Airflow yönetir. Azure Databricks ile Airflow yükleme ve kullanma hakkında bilgi için bkz. Apache Airflow ile Lakeflow İşlerini Orkestrasyonu.

Bir işlem hattını Airflow iş akışının parçası olarak çalıştırmak için DatabricksSubmitRunOperator'ı kullanın.

Gereksinimler

Lakeflow Spark Bildirimli İşlem Hatları için Airflow desteğini kullanmak için aşağıdakiler gereklidir:

Example

Aşağıdaki örnek, tanımlayıcısıyla 8279d543-063c-4d63-9926-dae38e35ce8b işlem hattı için bir güncelleme tetiklenmesini sağlayan bir Airflow DAG oluşturur:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('ldp',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_ID öğesini, çalışma alanınızdaki Airflow bağlantısı için tanımlayıcı ile değiştirin.

Bu örneği dizine airflow/dags kaydedin ve DAG'yi görüntülemek ve tetiklemek için Airflow kullanıcı arabirimini kullanın. İşlem hattı güncelleştirmesinin ayrıntılarını görüntülemek için işlem hattı kullanıcı arabirimini kullanın.

Azure Data Factory

Uyarı

Lakeflow Spark Bildirimli İşlem Hatları ve Azure Data Factory,bir hata oluştuğunda yeniden deneme sayısını yapılandırma seçeneklerini içerir. Yeniden deneme değerleri işlem hattınızda ve işlem hattını çağıran Azure Data Factory etkinliğinde yapılandırılırsa, yeniden deneme sayısı Azure Data Factory yeniden deneme değerinin işlem hattı yeniden deneme değeriyle çarpılmasıdır.

Örneğin, bir işlem hattı güncelleştirmesi başarısız olursa, Lakeflow Spark Bildirimli İşlem Hatları güncelleştirmeyi varsayılan olarak en fazla beş kez yeniden denenir. Azure Data Factory yeniden denemesi üç olarak ayarlanırsa ve işlem hattınız varsayılan olarak beş yeniden deneme kullanırsa, başarısız işlem hattınız en fazla on beş kez yeniden denenebilir. İşlem hattı güncelleştirmeleri başarısız olduğunda aşırı yeniden deneme girişimlerini önlemek için Databricks, işlem hattını veya işlem hattını çağıran Azure Data Factory etkinliğini yapılandırırken yeniden deneme sayısını sınırlamanızı önerir.

İşlem hattınızın yeniden deneme yapılandırmasını değiştirmek için işlem hattını yapılandırırken ayarını kullanın pipelines.numUpdateRetryAttempts .

Azure Data Factory, veri tümleştirme ve dönüştürme iş akışlarını düzenlemenize olanak tanıyan bulut tabanlı bir ETL hizmetidir. Azure Data Factory, bir iş akışında not defterleri, JAR görevleri ve Python betikleri gibi Azure Databricks görevlerinin çalıştırılmasını doğrudan destekler. Azure Data Factory Web etkinliğinden işlem hattı REST API'sini çağırarak bir iş akışına işlem hattı da ekleyebilirsiniz. Örneğin, Azure Data Factory'den işlem hattı güncelleştirmesini tetikleme:

  1. Bir veri fabrikası oluşturun veya mevcut bir veri fabrikasını açın.

  2. Oluşturma tamamlandığında veri fabrikanızın sayfasını açın ve Azure Data Factory Studio'yu Aç kutucuğuna tıklayın. Azure Data Factory kullanıcı arabirimi görüntülenir.

  3. Azure Data Factory Studio kullanıcı arabirimindeki Yeni açılan menüsünden İşlem Hattı seçerek yeni bir Azure Data Factory işlem hattı oluşturun.

  4. Etkinlikler araç kutusunda Genel'i genişletin ve Web etkinliğini işlem hattı tuvaline sürükleyin. Ayarlar sekmesine tıklayın ve aşağıdaki değerleri girin:

    Uyarı

    En iyi güvenlik uygulaması olarak otomatik araçlar, sistemler, betikler ve uygulamalarla kimlik doğrulaması yaptığınızda Databricks, çalışma alanı kullanıcıları yerine hizmet sorumlularına ait kişisel erişim belirteçlerini kullanmanızı önerir. Hizmet sorumluları için belirteçler oluşturmak amacıyla Hizmet sorumlusu için belirteçleri yönetme konusuna bakın.

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Değiştir <get-workspace-instance>.

      <pipeline-id> öğesini işlem hattı tanımlayıcısıyla değiştirin.

    • Yöntem: Açılan menüden POST'ı seçin.

    • Başlıklar: + Yeni üzerine tıklayın. Ad metin kutusuna Authorization girin. Değer metin kutusuna yazınBearer <personal-access-token>.

      yerine Azure Databricks <personal-access-token> ekleyin.

    • Gövde: Ek istek parametreleri geçirmek için, parametreleri içeren bir JSON belgesi girin. Örneğin, bir güncelleştirmeyi başlatmak ve işlem hattı için tüm verileri yeniden işlemek için: {"full_refresh": "true"}. Ek istek parametresi yoksa boş ayraç (){} girin.

Web etkinliğini test etmek için Data Factory kullanıcı arabirimindeki işlem hattı araç çubuğunda Hata Ayıklama butonuna tıklayın. Çalıştırmanın çıkış ve durumu (hatalar dahil) Azure Data Factory işlem hattının Çıkış sekmesinde görüntülenir. İşlem hattı güncelleştirmesinin ayrıntılarını görüntülemek için işlem hatları kullanıcı arabirimini kullanın.

İpucu

Yaygın iş akışı gereksinimlerinden biri, önceki görev tamamlandıktan sonra bir görevi başlatmaktır. İşlem hattı updates isteği zaman uyumsuz olduğundan(istek güncelleştirmeyi başlattıktan sonra ancak güncelleştirme tamamlanmadan önce döndürülüyor), işlem hattı güncelleştirmesine bağımlılığı olan Azure Data Factory işlem hattınızdaki görevlerin güncelleştirmenin tamamlanmasını beklemesi gerekir. Güncelleştirmenin tamamlanmasını bekleme seçeneği, Lakeflow Spark Bildirimli İşlem Hatları güncelleştirmesini tetikleyen Web etkinliğinin ardından bir Until etkinliği eklemektir. Until etkinliğinde:

  1. Güncelleştirmenin tamamlanması için yapılandırılmış bir saniye sayısını beklemek için Bir Bekleme etkinliği ekleyin.
  2. Güncelleştirmenin durumunu almak için işlem hattı güncelleştirme ayrıntıları isteğini kullanan Bekleme etkinliğinin ardından bir Web etkinliği ekleyin. Yanıttaki state alan, tamamlanmış olup olmadığını içeren güncelleştirmenin geçerli durumunu döndürür.
  3. Until etkinliğinin sonlandırıcı koşulunu ayarlamak için state alanının değerini kullanın. Değeri temel alan bir işlem hattı değişkeni eklemek ve sonlandırıcı koşul için bu değişkeni kullanmak için state de kullanabilirsiniz.