Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Możesz uruchomić pipeline jako część schematu przepływu pracy przetwarzania danych, korzystając z Lakeflow Jobs, Apache Airflow lub Azure Data Factory.
Jobs
W zadaniu usługi Databricks można organizować wiele zadań w celu zaimplementowania przepływu pracy przetwarzania danych. Aby dołączyć potok do zadania, użyj zadania Potok podczas tworzenia zadania. Zobacz Zadanie potoku dla zadań.
Przepływ powietrza Apache
Apache Airflow to rozwiązanie typu open source do zarządzania przepływami pracy danych i planowania ich. Airflow przedstawia przepływy pracy jako ukierunkowane acykliczne grafy (DAG) operacji. Przepływ pracy definiuje się w pliku w języku Python, a funkcja Airflow zarządza planowaniem i wykonywaniem. Aby uzyskać informacje na temat instalacji i używania Airflow z Azure Databricks, zobacz Orchestrate Lakeflow Jobs with Apache Airflow.
Aby uruchomić potok w ramach przepływu pracy Airflow, użyj operatora DatabricksSubmitRunOperator.
Requirements
Do korzystania z obsługi Airflow dla deklaratywnych potoków Spark platformy Lakeflow wymagane są następujące elementy:
- Airflow w wersji 2.1.0 lub nowszej.
- Pakiet dostawcy usługi Databricks w wersji 2.1.0 lub nowszej.
Example
Poniższy przykład tworzy Airflow DAG, który wyzwala aktualizację potoku przy użyciu identyfikatora 8279d543-063c-4d63-9926-dae38e35ce8b:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('ldp',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Zamień CONNECTION_ID na identyfikator połączenia Airflow w obszarze roboczym.
Zapisz ten przykład w airflow/dags katalogu i użyj interfejsu użytkownika Airflow, aby wyświetlić i uruchomić DAG. Użyj interfejsu użytkownika potoku, aby wyświetlić szczegóły aktualizacji potoku.
Azure Data Factory
Uwaga / Notatka
Lakeflow Spark Declarative Pipelines i Azure Data Factory zawierają opcje do konfigurowania liczby ponownych prób uruchomienia w przypadku awarii. Jeśli parametry ponawiania są skonfigurowane zarówno w potoku jak i w aktywności usługi Azure Data Factory, która wywołuje ten potok, liczba ponownych prób to wartość ponawiania usługi Azure Data Factory pomnożona przez wartość ponawiania potoku.
Jeśli na przykład aktualizacja potoku zakończy się niepowodzeniem, deklaratywne potoki Lakeflow Spark próbują ponownie przeprowadzić aktualizację maksymalnie pięć razy. Jeśli ponawianie próby w usłudze Azure Data Factory jest ustawione na trzy, a potok używa wartości domyślnej pięciu ponownych prób, potok kończący się niepowodzeniem może zostać ponowiony do piętnastu razy. Aby uniknąć nadmiernej liczby ponownych prób w przypadku niepowodzenia aktualizacji potoku, usługa Databricks zaleca ograniczenie tej liczby podczas konfigurowania samego potoku lub działania usługi Azure Data Factory, które wywołuje potok.
Aby zmienić konfigurację ponawiania prób dla potoku, użyj pipelines.numUpdateRetryAttempts ustawienia podczas konfigurowania potoku.
Azure Data Factory to oparta na chmurze usługa ETL, która umożliwia organizowanie przepływów pracy integracji i przekształcania danych. Usługa Azure Data Factory bezpośrednio obsługuje uruchamianie zadań usługi Azure Databricks w przepływie pracy, w tym notesów, zadań JAR i skryptów języka Python. Potok można również uwzględnić w przepływie pracy, wywołując interfejs API REST potoku z działania internetowego usługi Azure Data Factory. Aby na przykład wyzwolić aktualizację potoku z usługi Azure Data Factory:
Utwórz fabrykę danych lub otwórz istniejącą fabrykę danych.
Po zakończeniu tworzenia otwórz stronę Twojej fabryki danych i kliknij kafelek Otwórz Azure Data Factory Studio. Zostanie wyświetlony interfejs użytkownika usługi Azure Data Factory.
Utwórz nowy potok danych Azure Data Factory, wybierając pozycję Potok z listy rozwijanej Nowy w interfejsie użytkownika Azure Data Factory Studio.
W przyborniku Działania rozwiń pozycję Ogólne i przeciągnij działanie Web na kanwę potoku. Kliknij kartę Ustawienia i wprowadź następujące wartości:
Uwaga / Notatka
Zalecana praktyka bezpieczeństwa w przypadku uwierzytelniania za pomocą zautomatyzowanych narzędzi, systemów, skryptów i aplikacji to użycie osobistych tokenów dostępu należących do pryncypałów usługi zamiast użytkowników obszaru roboczego, co zaleca Databricks. Aby utworzyć tokeny dla jednostek usługi, zobacz Zarządzanie tokenami dla jednostki usługi.
Adres URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.Zastąp element
<get-workspace-instance>.Zastąp
<pipeline-id>element identyfikatorem potoku.Metoda: wybierz pozycję POST z menu rozwijanego.
Nagłówki: kliknij + Nowy. W polu tekstowym Nazwa wprowadź .
AuthorizationW polu tekstowym Wartość wprowadźBearer <personal-access-token>.Zastąp
<personal-access-token>osobistym tokenem dostępu Azure Databricks.Treść: Aby przekazać dodatkowe parametry żądania, wprowadź dokument JSON zawierający parametry. Aby na przykład uruchomić aktualizację i ponownie przetworzyć wszystkie dane dla ścieżki przetwarzania:
{"full_refresh": "true"}. Jeśli nie ma żadnych dodatkowych parametrów żądania, wprowadź puste nawiasy klamrowe ({}).
Aby przetestować aktywność sieciową, kliknij Debug na pasku narzędzi potoku w interfejsie użytkownika usługi Data Factory. Dane wyjściowe i stan przebiegu, w tym błędy, są wyświetlane na karcie Dane wyjściowe potoku usługi Azure Data Factory. Użyj interfejsu użytkownika Pipeline do wyświetlenia szczegółów aktualizacji Pipeline.
Wskazówka
Typowym wymaganiem dotyczącym przepływu pracy jest uruchomienie zadania po zakończeniu poprzedniego zadania. Ponieważ żądanie potoku updates jest asynchroniczne — żądanie zostaje zwrócone po rozpoczęciu aktualizacji, ale przed ukończeniem aktualizacji — zadania w potoku usługi Azure Data Factory z zależności od aktualizacji potoku muszą poczekać na ukończenie aktualizacji. Opcja oczekiwania na zakończenie aktualizacji polega na dodaniu działania Until po działaniu sieciowym, które wyzwala aktualizację potoków deklaratywnych Lakeflow Spark. W działaniu Until:
- Dodaj działanie Wait (Oczekiwanie), aby poczekać skonfigurowaną liczbę sekund na ukończenie aktualizacji.
- Dodaj aktywność sieci Web następującą po działaniu Wait, która używa żądania uzyskania szczegółów aktualizacji potoku w celu uzyskania stanu aktualizacji. Pole
statew odpowiedzi zwraca bieżący stan aktualizacji, w tym jeśli został ukończony. - Użyj wartości
statepola, aby ustawić warunek zakończenia dla działania Until. Możesz również użyć aktywności Ustaw zmienną do dodania zmiennej potoku na podstawie wartościstatei użyć tej zmiennej dla warunku zakończenia.