Udostępnij za pośrednictwem


Ładowanie i przetwarzanie danych przyrostowo za pomocą Lakeflow deklaratywnych przepływów potoków

Dane są przetwarzane w deklaratywnych potokach Lakeflow za pomocą przepływów. Każdy przepływ składa się z zapytania i, zazwyczaj, elementu docelowego. Przepływ danych przetwarza zapytanie, albo jako partię, lub przyrostowo jako strumień danych do celu. Przepływ danych działa w potoku ETL w usłudze Azure Databricks.

Zazwyczaj przepływy są definiowane automatycznie podczas tworzenia zapytania w potokach deklaratywnych Lakeflow, które aktualizują obiekt docelowy, ale można również jawnie zdefiniować dodatkowe przepływy w celu bardziej złożonego przetwarzania, na przykład dołączenia do jednego obiektu docelowego danych z wielu źródeł.

Aktualizacje

Przepływ jest uruchamiany za każdym razem, gdy jego zdefiniowany potok jest aktualizowany. Przepływ utworzy lub zaktualizuje tabele z najnowszymi dostępnymi danymi. W zależności od typu przepływu i stanu zmian danych aktualizacja może wykonywać odświeżanie przyrostowe, które przetwarza tylko nowe rekordy lub wykonuje pełne odświeżanie, które ponownie przetwarza wszystkie rekordy ze źródła danych.

Tworzenie przepływu domyślnego

Podczas tworzenia obiektu Lakeflow Declarative Pipelines w potoku, zazwyczaj definiuje się tabelę lub widok wraz z zapytaniem, które je obsługuje. Na przykład w tym zapytaniu SQL utworzysz tabelę przesyłania strumieniowego o nazwie customers_silver , odczytując z tabeli o nazwie customers_bronze.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Możesz również utworzyć tę samą tabelę przesyłania strumieniowego w języku Python. W języku Python zwykle używasz Deklaratywnych Potoków Lakeflow, tworząc funkcję zapytania, która zwraca ramkę danych, z dekoratorami umożliwiającymi dostęp do funkcjonalności Deklaratywnych Potoków Lakeflow.

import dlt

@dlt.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

W tym przykładzie utworzono tabelę przesyłania strumieniowego. Można również tworzyć zmaterializowane widoki z podobną składnią zarówno w języku SQL, jak i Python. Aby uzyskać więcej informacji, zobacz Streaming tables and Materialized views ( Tabele przesyłania strumieniowego i zmaterializowane widoki).

W tym przykładzie zostanie utworzony domyślny przepływ wraz z tabelą przesyłania strumieniowego. Domyślny przepływ dla tabeli przesyłania strumieniowego to przepływ dołączania , który dodaje nowe wiersze z każdym wyzwalaczem. Jest to najczęstszy sposób korzystania z potoków deklaratywnych Lakeflow, który umożliwia utworzenie przepływu i celu jednocześnie. Za pomocą tego stylu można pozyskiwać dane lub przekształcać dane.

Dołączanie przepływów obsługuje również przetwarzanie, które wymaga odczytywania danych z wielu źródeł przesyłania strumieniowego w celu zaktualizowania pojedynczego obiektu docelowego. Możesz na przykład użyć funkcjonalności dołączania przepływu, jeśli masz istniejącą tabelę i proces przesyłania strumieniowego oraz chcesz dodać nowe źródło przesyłania strumieniowego, które zapisuje w tej istniejącej tabeli przesyłania strumieniowego.

Używanie wielu przepływów do zapisu w jednym obiekcie docelowym

W poprzednim przykładzie utworzono przepływ i tabelę strumieniową w jednym kroku. Możesz również tworzyć przepływy dla wcześniej utworzonej tabeli. W tym przykładzie można zobaczyć tworzenie tabeli i przepływu skojarzonego z nią w oddzielnych krokach. Ten kod ma identyczne wyniki jak tworzenie domyślnego przepływu, w tym użycie tej samej nazwy dla tabeli transmisji strumieniowej i przepływu.

Python

import dlt

# create streaming table
dlt.create_streaming_table("customers_silver")

# add a flow
@dlt.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

Utworzenie przepływu niezależnie od obiektu docelowego oznacza, że można również utworzyć wiele przepływów, które dołączają dane do tego samego obiektu docelowego.

Użyj dekoratora @append_flow w interfejsie języka Python lub klauzuli CREATE FLOW...INSERT INTO [ONCE] w interfejsie SQL, aby utworzyć nowy przepływ, na przykład aby skierować przepływ do tabeli przesyłania strumieniowego z wielu źródeł przesyłania strumieniowego. Użyj procesu dołączania do przetwarzania takich zadań, jak:

  • Dodaj źródła przesyłania strumieniowego, które dołączają dane do istniejącej tabeli przesyłania strumieniowego bez konieczności pełnego odświeżania. Na przykład możesz mieć tabelę łączącą dane regionalne z każdego regionu, w którym działasz. W miarę wdrażania nowych regionów można dodać nowe dane regionu do tabeli bez przeprowadzania pełnego odświeżania. Aby zapoznać się z przykładem dodawania źródeł przesyłania strumieniowego do istniejącej tabeli przesyłania strumieniowego, zobacz Przykład: Zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka.
  • Zaktualizuj tabelę przesyłania strumieniowego, dołączając brakujące dane historyczne (uzupełnianie). Składnia INSERT INTO ONCE umożliwia utworzenie uzupełnienia historycznych danych, które jest wykonywane raz. Na przykład masz istniejącą tabelę przesyłania strumieniowego napisaną na podstawie tematu platformy Apache Kafka. Masz również dane historyczne przechowywane w tabeli, które muszą zostać wstawione dokładnie raz do tabeli przesyłania strumieniowego, a nie można ich przesyłać strumieniowo, ponieważ przetwarzanie obejmuje złożoną agregację przed wstawieniem danych. Aby zapoznać się z przykładem wypełniania danych historycznych, zobacz Backfilling historical data with Lakeflow Declarative Pipelines (Wypełnianie danych historycznych za pomocą deklaratywnych potoków Lakeflow).
  • Łączyć dane z wielu źródeł i zapisywać w jednej tabeli przesyłania strumieniowego zamiast stosowania klauzuli UNION w zapytaniu. Użycie przetwarzania dodawania danych zamiast UNION umożliwia przyrostowe aktualizowanie tabeli docelowej bez konieczności uruchamiania aktualizacji pełnego odświeżania. Aby zapoznać się z przykładem łączenia wykonanego w ten sposób, zobacz Przykład: Używanie przetwarzania przepływu dołączania zamiast UNION.

Celem dla rekordów wyjściowych po przetworzeniu przepływu dołączania może być istniejąca tabela lub nowa tabela. W przypadku zapytań języka Python użyj funkcji create_streaming_table(), aby utworzyć tabelę docelową.

W poniższym przykładzie dodano dwa przepływy dla tego samego celu, tworząc połączenie dwóch tabel źródłowych:

Python

import dlt

# create a streaming table
dlt.create_streaming_table("customers_us")

# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Ważny

  • Jeśli musisz zdefiniować ograniczenia dotyczące jakości danych z oczekiwaniami , zdefiniuj oczekiwania w tabeli docelowej w ramach funkcji create_streaming_table() lub istniejącej definicji tabeli. Nie można zdefiniować oczekiwań w definicji @append_flow.
  • Przepływy są identyfikowane przez nazwę przepływu , a ta nazwa służy do identyfikowania punktów kontrolnych przesyłania strumieniowego. Użycie nazwy przepływu do identyfikowania punktu kontrolnego oznacza następujące kwestie:
    • Jeśli przepływ istniejący w potoku zostanie zmieniony przez zmianę jego nazwy, punkt kontrolny nie jest przenoszony, a zmieniony przepływ staje się w rzeczywistości całkowicie nowym przepływem.
    • Nie można ponownie użyć nazwy przepływu w potoku, ponieważ istniejący punkt kontrolny nie będzie zgodny z nową definicją przepływu.

Typy przepływów

Domyślne przepływy dla tabel przesyłania strumieniowego i widoki zmaterializowane to przepływy dodawania. Możesz również tworzyć przepływy do odczytu ze źródeł danych przechwytywania zmian . W poniższej tabeli opisano różne typy przepływów.

Typ przepływu Opis
Dołącz Przepływy dołączania są najczęściej używanym typem przepływu, w którym nowe rekordy w źródle są zapisywane w miejscu docelowym przy użyciu każdej aktualizacji. Odpowiadają one trybowi dołączania w ustrukturyzowanym strumieniowaniu. Można dodać flagę ONCE wskazującą zapytanie wsadowe, którego dane powinny zostać wstawione do obiektu docelowego tylko raz, chyba że obiekt docelowy zostanie w pełni odświeżony. Dowolna liczba operacji dołączania może zapisywać dane do konkretnego obiektu docelowego.
Domyślne przepływy (utworzone przy użyciu docelowej tabeli strumieniowej lub zmaterializowanego widoku) będą miały taką samą nazwę jak cel. Inne obiekty docelowe nie mają przepływów domyślnych.
Auto CDC (wcześniej Zastosuj zmiany) Przepływ Automatycznego CDC pozyskuje zapytanie zawierające dane przechwytywania zmian (CDC). Automatyczne przepływy CDC mogą być przeznaczone tylko dla tabel przesyłania strumieniowego, a źródło musi być źródłem przesyłania strumieniowego (nawet w przypadku przepływów ONCE). Wiele przepływów automatycznej usługi CDC może być przeznaczonych dla pojedynczej tabeli przesyłania strumieniowego. Tabela przesyłania strumieniowego, która działa jako element docelowy dla przepływu automatycznego CDC, może być używana tylko przez inne przepływy automatycznego CDC.
Aby uzyskać więcej informacji na temat danych CDC, zobacz Interfejsy API AUTO CDC: Upraszczanie przechwytywania danych zmian za pomocą potoków deklaratywnych Lakeflow.

Dodatkowe informacje

Aby uzyskać więcej informacji na temat przepływów i ich użycia, zobacz następujące tematy: