Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dowiedz się, jak utworzyć i wdrożyć potok ETL (wyodrębnianie, przekształcanie i ładowanie) z przechwytywaniem zmian danych (CDC) przy użyciu potoków deklaratywnych platformy Lakeflow Spark (SDP) na potrzeby orkiestracji danych oraz Auto Loader. Potok ETL implementuje kroki odczytywania danych z systemów źródłowych, przekształcania tych danych na podstawie wymagań, takich jak kontrole jakości danych i deduplikacja rekordów oraz zapisywanie danych w systemie docelowym, takim jak magazyn danych lub magazyn danych typu data lake.
W tym samouczku użyjesz danych z customers tabeli w bazie danych MySQL, aby:
- Wyodrębnij zmiany z transakcyjnej bazy danych przy użyciu narzędzia Debezium lub innego narzędzia i zapisz je w magazynie obiektów w chmurze (S3, ADLS lub GCS). W tym samouczku pominiesz konfigurowanie zewnętrznego systemu CDC i zamiast tego wygenerujesz fałszywe dane, aby uprościć samouczek.
- Użyj Auto Loadera, aby przyrostowo załadować komunikaty z magazynu obiektów w chmurze i zapisać je w surowej postaci w tabeli
customers_cdc. Moduł Auto Loader wyprowadza schemat i obsługuje ewolucję schematu. - Utwórz tabelę,
customers_cdc_cleanaby sprawdzić jakość danych przy użyciu oczekiwań. Na przykład,idnigdy nie powinien byćnullponieważ jest używany do uruchamiania operacji upsert. - Wykonaj
AUTO CDC ... INTOna oczyszczonych danych CDC, aby wprowadzić zmiany do tabeli końcowejcustomers. - Pokaż, jak potok danych może utworzyć tabelę typu 2 wolno zmieniającego się wymiaru (SCD2), aby śledzić wszystkie zmiany.
Celem jest pozyskiwanie nieprzetworzonych danych niemal w czasie rzeczywistym i tworzenie tabeli dla zespołu analityków przy jednoczesnym zapewnieniu jakości danych.
W tym samouczku użyto architektury medallion Lakehouse, gdzie nieprzetworzone dane są pozyskiwane za pośrednictwem warstwy brązowej, dane są czyszczone i weryfikowane przy użyciu warstwy srebrnej, a modelowanie wymiarowe i agregacja są stosowane przy użyciu warstwy złotej. Aby uzyskać więcej informacji, zobacz Co to jest architektura medallion lakehouse?
Zaimplementowany przepływ wygląda następująco:
Aby uzyskać więcej informacji na temat potoku, automatycznego modułu ładującego i cdC, zobacz Lakeflow Spark Deklaratywne potoki, Co to jest automatyczne ładowanie? i Co to jest przechwytywanie danych zmian (CDC)?
Requirements
Aby ukończyć ten samouczek, musisz spełnić następujące wymagania:
- Zaloguj się do obszaru roboczego usługi Azure Databricks.
- Włącz Unity Catalog dla swojego obszaru roboczego.
- Mieć włączone bezserwerowe obliczenia dla twojego konta. Bezserwerowe deklaratywne potoki Spark Lakeflow nie są dostępne we wszystkich regionach obszaru roboczego. Zobacz Funkcje z ograniczoną dostępnością regionalną dla dostępnych regionów. Jeśli dla twojego konta nie włączono przetwarzania bezserwerowego, kroki powinny działać z domyślnym obliczeniami dla obszaru roboczego.
- Mieć uprawnienia do tworzenia zasobu obliczeniowego lub dostępu do zasobu obliczeniowego.
- Mieć uprawnienia do tworzenia nowego schematu w wykazie. Wymagane uprawnienia to
ALL PRIVILEGESlubUSE CATALOGiCREATE SCHEMA. - Mieć uprawnienia do tworzenia nowego woluminu w istniejącym schemacie. Wymagane uprawnienia to
ALL PRIVILEGESlubUSE SCHEMAiCREATE VOLUME.
Przechwytywanie zmian danych w potoku ETL
Przechwytywanie zmian danych (CDC) to proces przechwytywania zmian w rekordach wprowadzonych w transakcyjnej bazie danych (na przykład MySQL lub PostgreSQL) lub magazynie danych. CDC przechwytuje operacje, takie jak usuwanie danych, dopisywanie i aktualizacje, zazwyczaj w formie strumienia, aby ponownie materializować tabele w systemach zewnętrznych. Usługa CDC umożliwia ładowanie przyrostowe, eliminując konieczność aktualizacji ładowania zbiorczego.
Uwaga / Notatka
Aby uprościć ten samouczek, pomiń konfigurowanie zewnętrznego systemu CDC. Załóżmy, że jest ono uruchomione i zapisuje dane CDC jako pliki JSON w magazynie obiektów w chmurze (S3, ADLS lub GCS). W tym samouczku użyto biblioteki Faker do wygenerowania danych wykorzystywanych w samouczku.
Przechwytywanie CDC
Dostępne są różne narzędzia CDC. Jednym z wiodących rozwiązań typu open source jest Debezium, ale inne implementacje upraszczające źródła danych istnieją, takie jak Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate i AWS DMS.
W tym samouczku używasz danych CDC z systemu zewnętrznego, takiego jak Debezium lub DMS. Debezium przechwytuje każdy zmieniony wiersz. Zazwyczaj wysyła historię zmian danych do tematów platformy Kafka lub zapisuje je jako pliki.
Należy zaimportować informacje CDC z tabeli customers (w formacie JSON), sprawdzić ich poprawność, a następnie zmaterializować tabelę klientów w usłudze Data Lakehouse.
Dane wejściowe CDC z narzędzia Debezium
Dla każdej zmiany otrzymasz komunikat JSON zawierający wszystkie pola aktualizowanego wiersza (id, firstname, lastname, email, address). Komunikat zawiera również dodatkowe metadane:
-
operation: kod operacji, zazwyczaj (DELETE,APPEND,UPDATE). -
operation_date: Data i czas dla każdego działania operacyjnego.
Narzędzia takie jak Debezium mogą tworzyć bardziej zaawansowane dane wyjściowe, takie jak wartość wiersza przed zmianą, ale w tym samouczku pominięto je dla uproszczenia.
Krok 1: Utwórz potok
Utwórz nowy potok ETL, aby wysyłać zapytania do źródła danych CDC i generować tabele w obszarze roboczym.
W obszarze roboczym kliknij
Nowy w lewym górnym rogu.
Kliknij Potok ETL.
Zmień tytuł potoku na
Pipelines with CDC tutoriallub preferowaną nazwę.W tytule wybierz katalog i schemat, dla którego masz uprawnienia do zapisu.
Ten wykaz i schemat są używane domyślnie, jeśli nie określisz katalogu ani schematu w kodzie. Kod może zapisywać do dowolnego katalogu lub schematu, poprzez podanie pełnej ścieżki. W tym samouczku używane są wartości domyślne, które tutaj określisz.
W obszarze Opcje zaawansowane wybierz pozycję Rozpocznij od pustego pliku.
Wybierz folder dla kodu. Możesz wybrać pozycję Przeglądaj , aby przeglądać listę folderów w obszarze roboczym. Możesz wybrać dowolny folder, dla którego masz uprawnienia do zapisu.
Aby użyć kontroli wersji, wybierz folder Git. Jeśli musisz utworzyć nowy folder, wybierz
Wybierz język Python lub SQL dla języka pliku na podstawie języka, którego chcesz użyć na potrzeby samouczka.
Kliknij pozycję Wybierz , aby utworzyć potok przy użyciu tych ustawień, a następnie otwórz Edytor potoków lakeflow.
Masz teraz pustą rurę z domyślnym wykazem i strukturą. Następnie skonfiguruj przykładowe dane do zaimportowania w samouczku.
Krok 2. Tworzenie przykładowych danych do zaimportowania w tym samouczku
Ten krok nie jest wymagany, jeśli importujesz własne dane z istniejącego źródła. Na potrzeby tego samouczka wygeneruj fałszywe dane jako przykład dla tego samouczka. Utwórz notatnik, aby uruchomić skrypt generowania danych w Pythonie. Ten kod należy uruchomić tylko raz, aby wygenerować przykładowe dane, więc utwórz go w folderze potoku explorations, który nie jest uruchamiany w ramach aktualizacji potoku.
Uwaga / Notatka
Ten kod używa narzędzia Faker do generowania przykładowych danych CDC. Narzędzie Faker jest dostępne do automatycznego instalowania, więc w tym samouczku jest używany program %pip install faker. Możesz również ustawić zależność od faker dla notesu. Zobacz Dodawanie zależności do notesu.
W edytorze Lakeflow Pipelines, w pasku bocznym przeglądarki zasobów po lewej stronie edytora, kliknij
Dodaj, a następnie wybierz Eksploracja.
Nadaj mu nazwę, na przykład
Setup data, wybierz pozycję Python. Możesz pozostawić domyślny folder docelowy, który jest nowymexplorationsfolderem.Kliknij pozycję Utwórz. Spowoduje to utworzenie notesu w nowym folderze.
Wprowadź następujący kod w pierwszej komórce. Musisz zmienić definicję
<my_catalog>i<my_schema>, aby dopasować do domyślnego wykazu i schematu, który został wybrany w poprzedniej procedurze.%pip install faker # Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = dbName = db = "<my_schema>" spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`') volume_folder = f"/Volumes/{catalog}/{db}/raw_data" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exist, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")Aby wygenerować zestaw danych używany w samouczku, wpisz Shift + Enter , aby uruchomić kod:
Opcjonalny. Aby wyświetlić podgląd danych używanych w tym samouczku, wprowadź następujący kod w następnej komórce i uruchom kod. Zaktualizuj katalog i schemat, aby był zgodny ze ścieżką z poprzedniego kodu.
# Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = "<my_schema>" display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
Spowoduje to wygenerowanie dużego zestawu danych (z fałszywymi danymi CDC), którego można użyć w pozostałej części samouczka. W następnym kroku pozyskujemy dane przy użyciu Auto Loader.
Krok 3. Przyrostowe pozyskiwanie danych za pomocą modułu automatycznego ładowania
Następnym krokiem jest pozyskiwanie surowych danych z sfałszowanego magazynu w chmurze do warstwy brązowej.
Może to być trudne z wielu powodów, ponieważ należy wykonać następujące czynności:
- Działa na dużą skalę, potencjalnie przetwarzając miliony małych plików.
- Wnioskowanie schematu i typu JSON.
- Obsługa nieprawidłowych rekordów z nieprawidłowym schematem JSON.
- Zadbaj o ewolucję schematu (na przykład nową kolumnę w tabeli klienta).
Moduł Auto Loader upraszcza pobieranie danych, w tym wnioskowanie i ewolucję schematu, jednocześnie skalując do milionów przychodzących plików. Narzędzie Auto Loader jest dostępne w Pythonie przy użyciu cloudFiles i w SQL przy użyciu SELECT * FROM STREAM read_files(...) i może być używane z różnymi formatami (JSON, CSV, Apache Avro itp.):
Zdefiniowanie tabeli jako tabeli strumieniowej gwarantuje, że przetwarzasz tylko nowe dane przychodzące. Jeśli nie zdefiniujesz jej jako tabeli strumieniowej, skanuje i pozyskuje wszystkie dostępne dane. Aby uzyskać więcej informacji, zobacz Tabele przesyłania strumieniowego .
Aby pozyskać przychodzące dane CDC przy użyciu modułu automatycznego ładowania, skopiuj i wklej następujący kod do pliku kodu utworzonego za pomocą potoku (o nazwie
my_transformation.py). Możesz użyć języka Python lub SQL na podstawie języka wybranego podczas tworzenia potoku. Pamiętaj, aby wymienić wartości<catalog>i<schema>na te, które ustawiłeś jako domyślne dla potoku.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # Replace with the catalog and schema name that # you are using: path = "/Volumes/<catalog>/<schema>/raw_data/customers" # Create the target bronze table dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @dp.append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(f"{path}") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( -- replace with the catalog/schema you are using: "/Volumes/<catalog>/<schema>/raw_data/customers", format => "json", inferColumnTypes => "true" )Kliknij
Uruchom plik lub Uruchom potok, aby uruchomić aktualizację połączonego potoku. W przypadku tylko jednego pliku źródłowego w potoku są one funkcjonalnie równoważne.
Po zakończeniu aktualizacji edytor zostanie zaktualizowany o informacje dotyczące twojego pipeline'u.
- Wykres potoku (DAG) na pasku bocznym po prawej stronie kodu przedstawia jedną tabelę
customers_cdc_bronze. - Podsumowanie aktualizacji jest wyświetlane na górze przeglądarki zasobów pipelinu.
- Szczegóły tabeli, która została wygenerowana, są wyświetlane w dolnym okienku i można przeglądać dane z tabeli, wybierając ją.
Jest to nieprzetworzone dane warstwy brązowej importowane z magazynu w chmurze. W następnym kroku wyczyść dane, aby utworzyć tabelę warstwy srebrnej.
Krok 4. Oczyszczanie i oczekiwania dotyczące śledzenia jakości danych
Po zdefiniowaniu warstwy z brązu utwórz warstwę srebrną, dodając oczekiwania dotyczące kontroli jakości danych. Sprawdź następujące warunki:
- Identyfikator nigdy nie może mieć wartości
null. - Typ operacji CDC musi być prawidłowy.
- Kod JSON musi być poprawnie odczytany przez moduł automatycznego ładowania.
Wiersze, które nie spełniają tych warunków, są porzucane.
Aby uzyskać więcej informacji, zobacz Zarządzanie jakością danych przy użyciu oczekiwań potoku .
Na pasku bocznym przeglądarki zasobów potoku kliknij
Dodaj, a następnie Przekształcenie.
Wprowadź nazwę i wybierz język (Python lub SQL) dla pliku kodu źródłowego. Możesz łączyć i zestawiać języki w potoku, aby wybrać dowolny z nich dla tego etapu.
Aby utworzyć warstwę srebrną z oczyszczoną tabelą i narzucić ograniczenia, skopiuj i wklej następujący kod do nowego pliku (wybierz język Python lub SQL na podstawie języka pliku).
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @dp.append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( spark.readStream.table("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;Kliknij
Uruchom plik lub Uruchom potok, aby uruchomić aktualizację połączonego potoku.
Ponieważ istnieją teraz dwa pliki źródłowe, nie robią tego samego, ale w tym przypadku dane wyjściowe są takie same.
- Uruchamianie potoku powoduje uruchomienie całego potoku, w tym kodu z kroku 3. Jeśli dane wejściowe były aktualizowane, spowoduje to ściągnięcie wszelkich zmian z tego źródła do warstwy z brązu. Nie uruchamia to kodu z etapu konfiguracji danych, ponieważ znajduje się on w folderze eksploracji, a nie w części źródła pipeline'u.
- Uruchom plik uruchamia tylko bieżący plik źródłowy. W takim przypadku bez aktualizowania danych wejściowych generuje to srebrne dane z buforowanej tabeli z brązu. Byłoby przydatne uruchomienie tylko tego pliku w celu szybszej iteracji podczas tworzenia lub edytowania kodu potokowego.
Po zakończeniu aktualizacji zobaczysz, że wykres potoku teraz pokazuje dwie tabele (gdzie warstwa srebrna opiera się na warstwie brązowej), a panel dolny pokazuje szczegóły obu tabel. W głównym widoku przeglądarki zasobów w potoku danych teraz wyświetlany jest czas wielu uruchomień, ale tylko szczegóły dotyczące ostatniego uruchomienia są dostępne.
Następnie utwórz ostateczną wersję warstwy złotej tabeli customers.
Krok 5: Materializowanie tabeli klientów przy użyciu automatycznego przepływu CDC
Do tego momentu tabele po prostu przekazały dane CDC na każdym etapie. Teraz utwórz tabelę customers, która będzie zawierała najnowszy widok oraz będzie repliką oryginalnej tabeli, a nie listą operacji CDC, które ją utworzyły.
Nie jest trywialne do zaimplementowania ręcznie. Należy wziąć pod uwagę takie elementy, jak deduplikacja danych, aby zachować najnowszy wiersz.
Jednak potoki deklaratywne Lakeflow Spark rozwiązuje te problemy z operacją AUTO CDC.
Na pasku bocznym przeglądarki zasobów potoku kliknij
Dodawanie i przekształcanie.
Wprowadź nazwę i wybierz język (Python lub SQL) dla nowego pliku kodu źródłowego. Możesz ponownie wybrać język dla tego kroku, ale użyj poprawnego kodu poniżej.
Aby przetworzyć dane CDC przy użyciu
AUTO CDCw Potokach Deklaratywnych Lakeflow Spark, skopiuj i wklej następujący kod do nowego pliku.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table(name="customers", comment="Clean, materialized customers") dp.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;Kliknij
Uruchom plik, aby uruchomić aktualizację połączonego potoku.
Po zakończeniu aktualizacji można zauważyć, że wykres potoku przedstawia 3 tabele, przechodząc od brązowego, przez srebrny, do złotego.
Krok 6. Śledzenie historii aktualizacji z powolnym zmienianiem typu wymiaru 2 (SCD2)
Często wymagane jest utworzenie tabeli śledzącej wszystkie zmiany wynikające z APPEND, UPDATE i DELETE:
- Historia: chcesz zachować historię wszystkich zmian w tabeli.
- Możliwość śledzenia: chcesz zobaczyć, która operacja wystąpiła.
ScD2 z protokołem SDP usługi Lakeflow
Funkcja Delta obsługuje przepływ danych zmian (CDF) i table_change może wykonywać zapytania dotyczące modyfikacji tabel w usługach SQL i Python. Jednak głównym przypadkiem użycia usługi CDF jest przechwytywanie zmian w potoku, a nie tworzenie pełnego widoku zmian tabeli od początku.
Implementacja staje się szczególnie złożona, jeśli masz nieuporządkowane zdarzenia. Jeśli musisz sekwencjonować zmiany według znacznika czasu i otrzymać modyfikację, która miała miejsce w przeszłości, musisz dołączyć nowy wpis w tabeli SCD i zaktualizować poprzednie wpisy.
Protokół SDP usługi Lakeflow usuwa tę złożoność i umożliwia utworzenie oddzielnej tabeli zawierającej wszystkie modyfikacje od początku czasu. Ta tabela może być następnie używana na dużą skalę z określonymi partycjami lub kolumnami ZORDER, jeśli jest to wymagane. Pola spoza kolejności są obsługiwane w standardzie na podstawie elementu _sequence_by.
Aby utworzyć tabelę SCD2, użyj opcji STORED AS SCD TYPE 2 w języku SQL lub stored_as_scd_type="2" w języku Python.
Uwaga / Notatka
Możesz również ograniczyć kolumny, które funkcja śledzi, przy użyciu opcji: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
Na pasku bocznym przeglądarki zasobów potoku kliknij
Dodawanie i przekształcanie.
Wprowadź nazwę i wybierz język (Python lub SQL) dla nowego pliku kodu źródłowego.
Skopiuj i wklej następujący kod do nowego pliku.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # create the table dp.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dp.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updatesSQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW customers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;Kliknij
Uruchom plik, aby uruchomić aktualizację połączonego potoku.
Po zakończeniu aktualizacji wykres potoku zawiera nową customers_history tabelę, również uzależnioną od tabeli warstwy srebrnej, a dolny panel wyświetla szczegóły dotyczące wszystkich 4 tabel.
Krok 7. Tworzenie zmaterializowanego widoku, który śledzi, kto zmienił swoje informacje najbardziej
Tabela customers_history zawiera wszystkie zmiany historyczne wprowadzone przez użytkownika w swoich informacjach. Utwórz prosty zmaterializowany widok w złotej warstwie, który śledzi, kto dokonał największych zmian w swoich informacjach. Może to być używane do analizy wykrywania oszustw lub zaleceń użytkowników w rzeczywistym scenariuszu. Ponadto, stosowanie zmian przy użyciu mechanizmu SCD2 już usunęło duplikaty, dzięki czemu można bezpośrednio zliczyć wiersze na identyfikator użytkownika.
Na pasku bocznym przeglądarki zasobów potoku kliknij
Dodawanie i przekształcanie.
Wprowadź nazwę i wybierz język (Python lub SQL) dla nowego pliku kodu źródłowego.
Skopiuj i wklej następujący kod do nowego pliku źródłowego.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * @dp.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( spark.read.table("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY idKliknij
Uruchom plik, aby uruchomić aktualizację połączonego potoku.
Po zakończeniu aktualizacji w wykresie potokowym znajduje się nowa tabela, która zależy od customers_history tabeli, i można ją wyświetlić w dolnym panelu. Twój pipeline jest teraz kompletny. Można go przetestować, wykonując pełny potok uruchamiania. Jedynymi pozostałymi krokami są zaplanowanie regularnego aktualizowania potoku.
Krok 8. Tworzenie zadania uruchamiania potoku ETL
Następnie utwórz workflow, aby zautomatyzować kroki pozyskiwania, przetwarzania i analizy danych w pipeline przy użyciu zadania Databricks.
- W górnej części edytora wybierz przycisk Harmonogram .
- Jeśli zostanie wyświetlone okno dialogowe Harmonogramy , wybierz pozycję Dodaj harmonogram.
- Spowoduje otwarcie okna dialogowego Nowy harmonogram, w którym można utworzyć zadanie uruchamiające potok zgodnie z harmonogramem.
- Opcjonalnie nadaj zadaniu nazwę.
- Domyślnie harmonogram jest uruchamiany raz dziennie. Możesz zaakceptować to ustawienie domyślne lub ustawić własny harmonogram. Wybranie opcji Zaawansowane umożliwia ustawienie określonego czasu, w którym zadanie zostanie uruchomione. Wybranie pozycji Więcej opcji umożliwia tworzenie powiadomień po uruchomieniu zadania.
- Wybierz pozycję Utwórz , aby zastosować zmiany i utworzyć zadanie.
Teraz zadanie będzie uruchamiane codziennie, aby zapewnić aktualność Twojego pipeline'u. Możesz ponownie wybrać pozycję Harmonogram , aby wyświetlić listę harmonogramów. Możesz zarządzać harmonogramami potoku w tym oknie dialogowym, w tym dodawać, edytować lub usuwać harmonogramy.
Kliknięcie nazwy harmonogramu (lub zadania) przeniesie Cię do strony zadania na liście Zadania i potoki danych. W tym miejscu możesz wyświetlić szczegółowe informacje na temat przebiegów zadań, w tym historię przebiegów, lub natychmiast uruchomić zadanie za pomocą przycisku Uruchom teraz .
Aby uzyskać więcej informacji na temat przebiegów zadań, zobacz Monitorowanie i obserwacja dla zadań Lakeflow.
Dodatkowe zasoby
- Potoki deklaratywne platformy Spark w usłudze Lakeflow
- Samouczek: Tworzenie potoku ETL przy użyciu deklaratywnych potoków Lakeflow Spark
- Co to jest przechwytywanie danych zmian (CDC)?
- Interfejsy API AUTO CDC: Ułatw przechwytywanie zmian danych za pomocą potoków danych
- Zamień potok na projekt Databricks Asset Bundle
- Co to jest moduł automatycznego ładowania?