Udostępnij przez


Ładowanie i przetwarzanie danych przyrostowo przy użyciu przepływów deklaratywnych potoków Lakeflow Spark

Dane przetwarzane są w potokach za pośrednictwem przepływów. Każdy przepływ składa się z zapytania i, zazwyczaj, elementu docelowego. Proces przetwarza zapytanie, jako partię lub przyrostowo, jako strumień danych kierowany do obiektu docelowego. Przepływ znajduje się w potoku w deklaratywnych potokach Lakeflow Spark.

Zazwyczaj przepływy są definiowane automatycznie podczas tworzenia zapytania w potoku, który aktualizuje obiekt docelowy, ale można również jawnie zdefiniować dodatkowe przepływy na potrzeby bardziej złożonego przetwarzania, na przykład dołączania do pojedynczego obiektu docelowego z wielu źródeł.

Updates

Przepływ jest uruchamiany za każdym razem, gdy jego definiujący potok jest aktualizowany. Przepływ utworzy lub zaktualizuje tabele z najnowszymi dostępnymi danymi. W zależności od typu przepływu i stanu zmian danych aktualizacja może wykonywać odświeżanie przyrostowe, które przetwarza tylko nowe rekordy lub wykonuje pełne odświeżanie, które przetwarza wszystkie rekordy ze źródła danych.

Tworzenie przepływu domyślnego

Podczas tworzenia potoku zazwyczaj definiuje się tabelę lub widok wraz z zapytaniem, które go obsługuje. Na przykład w tym zapytaniu SQL utworzysz tabelę przesyłania strumieniowego o nazwie customers_silver , odczytując z tabeli o nazwie customers_bronze.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Możesz również utworzyć tę samą tabelę przesyłania strumieniowego w języku Python. W języku Python używasz potoków, tworząc funkcję zapytania, która zwraca ramkę danych, z dekoratorami dodającymi funkcjonalność Deklaratywnych Potoków Lakeflow Spark:

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

W tym przykładzie utworzono tabelę przesyłania strumieniowego. Można również tworzyć zmaterializowane widoki z podobną składnią zarówno w języku SQL, jak i Python. Aby uzyskać więcej informacji, zobacz Streaming tables and Materialized views ( Tabele przesyłania strumieniowego i zmaterializowane widoki).

W tym przykładzie zostanie utworzony domyślny przepływ wraz z tabelą przesyłania strumieniowego. Domyślny przepływ dla tabeli strumieniowej to przepływ dodawania, który dodaje nowe wiersze przy każdym wyzwoleniu. Jest to najbardziej typowy sposób korzystania z potoków: utworzenie przepływu i celu w jednym kroku. Za pomocą tego stylu można pozyskiwać dane lub przekształcać dane.

Przepływy dołączające również obsługują przetwarzanie, które wymaga odczytywania danych z wielu źródeł strumieniowych w celu zaktualizowania jednego obiektu docelowego. Na przykład możesz użyć funkcjonalności dołączania przepływu, jeśli masz istniejącą tabelę przesyłania strumieniowego oraz przepływ i chcesz dodać nowe źródło przesyłania strumieniowego, które zapisuje w tej istniejącej tabeli.

Używanie wielu przepływów do zapisu w jednym obiekcie docelowym

W poprzednim przykładzie utworzono przepływ i tabelę strumieniową w jednym kroku. Możesz również tworzyć przepływy dla wcześniej utworzonej tabeli. W tym przykładzie można zobaczyć tworzenie tabeli i przepływu skojarzonego z nią w oddzielnych krokach. Ten kod ma takie same wyniki jak tworzenie przepływu domyślnego, w tym użycie tej samej nazwy dla tabeli przesyłania strumieniowego i przepływu.

Python

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

Utworzenie przepływu niezależnie od obiektu docelowego oznacza, że można również utworzyć wiele przepływów, które dołączają dane do tego samego obiektu docelowego.

Użyj dekoratora @dp.append_flow w interfejsie języka Python lub klauzuli CREATE FLOW...INSERT INTO w interfejsie SQL, aby utworzyć nowy przepływ danych, na przykład w celu skierowania tabeli strumieniowej z wielu źródeł strumieniowych. Użyj przepływu „dodawania” do przetwarzania zadań, takich jak:

  • Dodaj źródła przesyłania strumieniowego, które dołączają dane do istniejącej tabeli przesyłania strumieniowego bez konieczności pełnego odświeżania. Na przykład możesz mieć tabelę łączącą dane regionalne z każdego regionu, w którym działasz. W miarę wdrażania nowych regionów można dodać nowe dane regionu do tabeli bez przeprowadzania pełnego odświeżania. Aby zapoznać się z przykładem dodawania źródeł przesyłania strumieniowego do istniejącej tabeli przesyłania strumieniowego, zobacz Przykład: Zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka.
  • Zaktualizuj tabelę przesyłania strumieniowego, dołączając brakujące dane historyczne (wypełnianie). Składnia INSERT INTO ONCE umożliwia jednorazowe utworzenie dodatkowego, historycznego uzupełnienia danych. Na przykład masz istniejącą tabelę przesyłania strumieniowego, do której dane są zapisywane przez temat Apache Kafka. Istnieją również dane historyczne przechowywane w tabeli, które musisz wstawić dokładnie raz do tabeli strumieniowej. Nie można przesyłać strumieniowo tych danych, ponieważ przetwarzanie obejmuje złożoną agregację przed ich wstawieniem. Aby zapoznać się z przykładem uzupełniania danych, zobacz Backfilling historical data with pipelines (Uzupełnianie danych historycznych za pomocą potoków).
  • Połącz dane z wielu źródeł i zapisz w jednej tabeli strumieniowej zamiast używania klauzuli UNION w zapytaniu. Użycie przetwarzania przepływu dołączania zamiast UNION umożliwia przyrostowe aktualizowanie tabeli docelowej bez konieczności uruchamiania pełnej aktualizacji odświeżającej. Aby zobaczyć przykład złączenia w taki sposób, zobacz Przykład: Użycie przetwarzania przepływu dołączania danych zamiast UNION.

Docelowy element dla rekordów wyjściowych przetwarzanych przez przepływ dodawania może być istniejącą tabelą lub nową tabelą. W przypadku zapytań języka Python użyj funkcji create_streaming_table(), aby utworzyć tabelę docelową.

W poniższym przykładzie dodano dwa przepływy dla tego samego celu, tworząc połączenie dwóch tabel źródłowych:

Python

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Ważne

  • Jeśli musisz zdefiniować ograniczenia dotyczące jakości danych z oczekiwaniami, zdefiniuj oczekiwania w tabeli docelowej w ramach create_streaming_table() funkcji lub istniejącej definicji tabeli. Nie można zdefiniować oczekiwań w @append_flow definicji.
  • Przepływy są identyfikowane przez nazwę przepływu, a ta nazwa służy do identyfikowania punktów kontrolnych przesyłania strumieniowego. Użycie nazwy przepływu do identyfikowania punktu kontrolnego oznacza następujące kwestie:
    • Jeśli nazwa istniejącego przepływu w rurze zostanie zmieniona, punkt kontrolny nie jest przenoszony, a zmieniony przepływ działa jak zupełnie nowy przepływ.
    • Nie można ponownie użyć nazwy przepływu w potoku, ponieważ istniejący punkt kontrolny nie będzie zgodny z nową definicją przepływu.

Typy przepływów

Domyślne przepływy dla tabel przesyłanych strumieniowo i zmaterializowanych widoków to przepływy z dołączaniem. Możesz również tworzyć przepływy do odczytu ze źródeł danych przechwytywania zmian . W poniższej tabeli opisano różne typy przepływów.

Typ przepływu Description
Append Przepływy dodające są najczęściej spotykanym typem przepływu, gdzie nowe rekordy ze źródła są zapisywane w miejscu docelowym przy każdej aktualizacji. Odpowiadają one trybowi dołączania w ustrukturyzowanym strumieniowaniu. Można dodać flagę ONCE wskazującą zapytanie wsadowe, którego dane powinny zostać wstawione do obiektu docelowego tylko raz, chyba że obiekt docelowy zostanie w pełni odświeżony. Dowolna liczba dołączanych przepływów może zapisywać dane w określonym obiekcie docelowym.
Domyślne przepływy (utworzone z użyciem docelowej tabeli strumieniowej lub zmaterializowanego widoku) będą miały taką samą nazwę jak cel. Inne obiekty docelowe nie mają przepływów domyślnych.
Auto CDC (wcześniej zastosuj zmiany) Przepływ automatycznego CDC pozyskuje zapytanie zawierające dane change data capture (CDC). Automatyczne przepływy CDC mogą być przeznaczone tylko dla tabel przesyłania strumieniowego i źródło musi być źródłem przesyłania strumieniowego, nawet w przypadku przepływów ONCE. Wiele przepływów automatycznej usługi CDC może być przeznaczonych dla pojedynczej tabeli przesyłania strumieniowego. Tabela przesyłania strumieniowego, która działa jako element docelowy dla przepływu automatycznego CDC, może być używana wyłącznie przez inne przepływy automatycznego CDC.
Aby uzyskać więcej informacji na temat danych CDC, zobacz API AUTO CDC: Uproszczenie przechwytywania zmian danych za pomocą potoków.

Dodatkowe informacje

Aby uzyskać więcej informacji na temat przepływów i ich użycia, zobacz następujące tematy: