Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dane są przetwarzane w deklaratywnych potokach Lakeflow za pomocą przepływów. Każdy przepływ składa się z zapytania i, zazwyczaj, elementu docelowego. Przepływ danych przetwarza zapytanie, albo jako partię, lub przyrostowo jako strumień danych do celu. Przepływ danych działa w potoku ETL w usłudze Azure Databricks.
Zazwyczaj przepływy są definiowane automatycznie podczas tworzenia zapytania w potokach deklaratywnych Lakeflow, które aktualizują obiekt docelowy, ale można również jawnie zdefiniować dodatkowe przepływy w celu bardziej złożonego przetwarzania, na przykład dołączenia do jednego obiektu docelowego danych z wielu źródeł.
Aktualizacje
Przepływ jest uruchamiany za każdym razem, gdy jego zdefiniowany potok jest aktualizowany. Przepływ utworzy lub zaktualizuje tabele z najnowszymi dostępnymi danymi. W zależności od typu przepływu i stanu zmian danych aktualizacja może wykonywać odświeżanie przyrostowe, które przetwarza tylko nowe rekordy lub wykonuje pełne odświeżanie, które ponownie przetwarza wszystkie rekordy ze źródła danych.
- Aby uzyskać więcej informacji na temat aktualizacji potoku, zobacz Uruchom aktualizację w Deklaratywnych Potokach Lakeflow.
- Aby uzyskać więcej informacji na temat planowania i wyzwalania aktualizacji, zobacz Wyzwalany i ciągły tryb potoku.
Tworzenie przepływu domyślnego
Podczas tworzenia obiektu Lakeflow Declarative Pipelines w potoku, zazwyczaj definiuje się tabelę lub widok wraz z zapytaniem, które je obsługuje. Na przykład w tym zapytaniu SQL utworzysz tabelę przesyłania strumieniowego o nazwie customers_silver
, odczytując z tabeli o nazwie customers_bronze
.
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Możesz również utworzyć tę samą tabelę przesyłania strumieniowego w języku Python. W języku Python zwykle używasz Deklaratywnych Potoków Lakeflow, tworząc funkcję zapytania, która zwraca ramkę danych, z dekoratorami umożliwiającymi dostęp do funkcjonalności Deklaratywnych Potoków Lakeflow.
import dlt
@dlt.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
W tym przykładzie utworzono tabelę przesyłania strumieniowego. Można również tworzyć zmaterializowane widoki z podobną składnią zarówno w języku SQL, jak i Python. Aby uzyskać więcej informacji, zobacz Streaming tables and Materialized views ( Tabele przesyłania strumieniowego i zmaterializowane widoki).
W tym przykładzie zostanie utworzony domyślny przepływ wraz z tabelą przesyłania strumieniowego. Domyślny przepływ dla tabeli przesyłania strumieniowego to przepływ dołączania , który dodaje nowe wiersze z każdym wyzwalaczem. Jest to najczęstszy sposób korzystania z potoków deklaratywnych Lakeflow, który umożliwia utworzenie przepływu i celu jednocześnie. Za pomocą tego stylu można pozyskiwać dane lub przekształcać dane.
Dołączanie przepływów obsługuje również przetwarzanie, które wymaga odczytywania danych z wielu źródeł przesyłania strumieniowego w celu zaktualizowania pojedynczego obiektu docelowego. Możesz na przykład użyć funkcjonalności dołączania przepływu, jeśli masz istniejącą tabelę i proces przesyłania strumieniowego oraz chcesz dodać nowe źródło przesyłania strumieniowego, które zapisuje w tej istniejącej tabeli przesyłania strumieniowego.
Używanie wielu przepływów do zapisu w jednym obiekcie docelowym
W poprzednim przykładzie utworzono przepływ i tabelę strumieniową w jednym kroku. Możesz również tworzyć przepływy dla wcześniej utworzonej tabeli. W tym przykładzie można zobaczyć tworzenie tabeli i przepływu skojarzonego z nią w oddzielnych krokach. Ten kod ma identyczne wyniki jak tworzenie domyślnego przepływu, w tym użycie tej samej nazwy dla tabeli transmisji strumieniowej i przepływu.
Python
import dlt
# create streaming table
dlt.create_streaming_table("customers_silver")
# add a flow
@dlt.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
Utworzenie przepływu niezależnie od obiektu docelowego oznacza, że można również utworzyć wiele przepływów, które dołączają dane do tego samego obiektu docelowego.
Użyj dekoratora @append_flow
w interfejsie języka Python lub klauzuli CREATE FLOW...INSERT INTO [ONCE]
w interfejsie SQL, aby utworzyć nowy przepływ, na przykład aby skierować przepływ do tabeli przesyłania strumieniowego z wielu źródeł przesyłania strumieniowego. Użyj procesu dołączania do przetwarzania takich zadań, jak:
- Dodaj źródła przesyłania strumieniowego, które dołączają dane do istniejącej tabeli przesyłania strumieniowego bez konieczności pełnego odświeżania. Na przykład możesz mieć tabelę łączącą dane regionalne z każdego regionu, w którym działasz. W miarę wdrażania nowych regionów można dodać nowe dane regionu do tabeli bez przeprowadzania pełnego odświeżania. Aby zapoznać się z przykładem dodawania źródeł przesyłania strumieniowego do istniejącej tabeli przesyłania strumieniowego, zobacz Przykład: Zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka.
- Zaktualizuj tabelę przesyłania strumieniowego, dołączając brakujące dane historyczne (uzupełnianie). Składnia
INSERT INTO ONCE
umożliwia utworzenie uzupełnienia historycznych danych, które jest wykonywane raz. Na przykład masz istniejącą tabelę przesyłania strumieniowego napisaną na podstawie tematu platformy Apache Kafka. Masz również dane historyczne przechowywane w tabeli, które muszą zostać wstawione dokładnie raz do tabeli przesyłania strumieniowego, a nie można ich przesyłać strumieniowo, ponieważ przetwarzanie obejmuje złożoną agregację przed wstawieniem danych. Aby zapoznać się z przykładem wypełniania danych historycznych, zobacz Backfilling historical data with Lakeflow Declarative Pipelines (Wypełnianie danych historycznych za pomocą deklaratywnych potoków Lakeflow). - Łączyć dane z wielu źródeł i zapisywać w jednej tabeli przesyłania strumieniowego zamiast stosowania klauzuli
UNION
w zapytaniu. Użycie przetwarzania dodawania danych zamiastUNION
umożliwia przyrostowe aktualizowanie tabeli docelowej bez konieczności uruchamiania aktualizacji pełnego odświeżania. Aby zapoznać się z przykładem łączenia wykonanego w ten sposób, zobacz Przykład: Używanie przetwarzania przepływu dołączania zamiastUNION
.
Celem dla rekordów wyjściowych po przetworzeniu przepływu dołączania może być istniejąca tabela lub nowa tabela. W przypadku zapytań języka Python użyj funkcji create_streaming_table(), aby utworzyć tabelę docelową.
W poniższym przykładzie dodano dwa przepływy dla tego samego celu, tworząc połączenie dwóch tabel źródłowych:
Python
import dlt
# create a streaming table
dlt.create_streaming_table("customers_us")
# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
Ważny
- Jeśli musisz zdefiniować ograniczenia dotyczące jakości danych z oczekiwaniami , zdefiniuj oczekiwania w tabeli docelowej w ramach funkcji
create_streaming_table()
lub istniejącej definicji tabeli. Nie można zdefiniować oczekiwań w definicji@append_flow
. - Przepływy są identyfikowane przez nazwę przepływu , a ta nazwa służy do identyfikowania punktów kontrolnych przesyłania strumieniowego. Użycie nazwy przepływu do identyfikowania punktu kontrolnego oznacza następujące kwestie:
- Jeśli przepływ istniejący w potoku zostanie zmieniony przez zmianę jego nazwy, punkt kontrolny nie jest przenoszony, a zmieniony przepływ staje się w rzeczywistości całkowicie nowym przepływem.
- Nie można ponownie użyć nazwy przepływu w potoku, ponieważ istniejący punkt kontrolny nie będzie zgodny z nową definicją przepływu.
Typy przepływów
Domyślne przepływy dla tabel przesyłania strumieniowego i widoki zmaterializowane to przepływy dodawania. Możesz również tworzyć przepływy do odczytu ze źródeł danych przechwytywania zmian . W poniższej tabeli opisano różne typy przepływów.
Typ przepływu | Opis |
---|---|
Dołącz |
Przepływy dołączania są najczęściej używanym typem przepływu, w którym nowe rekordy w źródle są zapisywane w miejscu docelowym przy użyciu każdej aktualizacji. Odpowiadają one trybowi dołączania w ustrukturyzowanym strumieniowaniu. Można dodać flagę ONCE wskazującą zapytanie wsadowe, którego dane powinny zostać wstawione do obiektu docelowego tylko raz, chyba że obiekt docelowy zostanie w pełni odświeżony. Dowolna liczba operacji dołączania może zapisywać dane do konkretnego obiektu docelowego.Domyślne przepływy (utworzone przy użyciu docelowej tabeli strumieniowej lub zmaterializowanego widoku) będą miały taką samą nazwę jak cel. Inne obiekty docelowe nie mają przepływów domyślnych. |
Auto CDC (wcześniej Zastosuj zmiany) | Przepływ Automatycznego CDC pozyskuje zapytanie zawierające dane przechwytywania zmian (CDC). Automatyczne przepływy CDC mogą być przeznaczone tylko dla tabel przesyłania strumieniowego, a źródło musi być źródłem przesyłania strumieniowego (nawet w przypadku przepływów ONCE ). Wiele przepływów automatycznej usługi CDC może być przeznaczonych dla pojedynczej tabeli przesyłania strumieniowego. Tabela przesyłania strumieniowego, która działa jako element docelowy dla przepływu automatycznego CDC, może być używana tylko przez inne przepływy automatycznego CDC.Aby uzyskać więcej informacji na temat danych CDC, zobacz Interfejsy API AUTO CDC: Upraszczanie przechwytywania danych zmian za pomocą potoków deklaratywnych Lakeflow. |
Dodatkowe informacje
Aby uzyskać więcej informacji na temat przepływów i ich użycia, zobacz następujące tematy:
- Przykłady przepływów w deklaratywnych potokach Lakeflow
- Interfejsy API usługi AUTO CDC: upraszczanie przechwytywania zmian przy użyciu potoków deklaratywnych usługi Lakeflow
- Wypełnianie danych historycznych za pomocą deklaratywnych potoków Lakeflow
- Pisanie Deklaratywnych Potoków Lakeflow w języku Python lub SQL
- tabele przesyłania strumieniowego
- zmaterializowane widoki
- Użyj ujść do przesyłania rekordów strumieniowo do usług zewnętrznych przy użyciu deklaratywnych potoków Lakeflow