Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Usługa Lakeflow Spark Deklaratywne potoki (SDP) wprowadza kilka nowych słów kluczowych i funkcji SQL do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego w potokach. Obsługa języka SQL na potrzeby tworzenia potoków opiera się na podstawach usługi Spark SQL i dodaje obsługę funkcji przesyłania strumieniowego ze strukturą.
Użytkownicy zaznajomieni z DataFrame PySpark mogą preferować tworzenie kodu potoku w Pythonie. Język Python obsługuje bardziej rozbudowane testowanie i operacje, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania. Zobacz Rozwijaj kod potoku za pomocą Pythona.
Aby uzyskać pełne odniesienie do składni SQL potoku, zobacz Odniesienie do języka SQL potoku.
Podstawy opracowywania potoków w języku SQL
Kod SQL tworzący zbiory danych potoku używa składni CREATE OR REFRESH do definiowania widoków zmaterializowanych i tabel strumieniowych względem wyników zapytań.
Słowo kluczowe STREAM wskazuje, czy źródło danych, do których odwołuje się klauzula SELECT, powinno być odczytywane za pomocą semantyki przesyłania strumieniowego.
Odczyty i zapisy domyślnie wykorzystują katalog i schemat określony podczas konfiguracji potoku. Zobacz Ustaw katalog docelowy i schemat.
Kod źródłowy potoku znacząco różni się od skryptów SQL: SDP ocenia wszystkie definicje zestawów danych we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzy graf przepływu danych przed uruchomieniem zapytań. Kolejność zapytań wyświetlanych w plikach źródłowych definiuje kolejność oceny kodu, ale nie kolejność wykonywania zapytania.
Tworzenie zmaterializowanego widoku przy użyciu języka SQL
Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku przy użyciu języka SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Tworzenie tabeli przesyłania strumieniowego za pomocą języka SQL
Poniższy przykład kodu przedstawia podstawową składnię tworzenia tabeli strumieniowej w SQL. Podczas odczytywania źródła dla tabeli przesyłania strumieniowego słowo kluczowe STREAM wskazuje na użycie semantyki przesyłania strumieniowego dla źródła. Nie używaj słowa kluczowego STREAM podczas tworzenia zmaterializowanego widoku:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Uwaga / Notatka
Użyj słowa kluczowego STREAM, aby stosować semantykę przesyłania strumieniowego do odczytu ze źródła. Jeśli odczyt napotka zmianę lub usunięcie istniejącego rekordu, zostanie zgłoszony błąd. Odczyt ze źródeł statycznych lub dołączanych jest najbezpieczniejszy. Aby pozyskiwać dane, które mają zatwierdzenia zmian, możesz użyć Pythona i opcji SkipChangeCommits do obsługi błędów.
Ładowanie danych z magazynu obiektów
Pipelines obsługują ładowanie danych ze wszystkich formatów obsługiwanych przez Azure Databricks. Zobacz Opcje formatu danych.
Uwaga / Notatka
W tych przykładach używane są dane dostępne w /databricks-datasets, które są automatycznie zamontowane w obszarze roboczym. Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze do odwoływania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Czym są wolumeny Unity Catalog?.
Databricks zaleca używanie funkcji Auto Loader i tabel strumieniowych przy konfigurowaniu zadań pozyskiwania przyrostowego z danych przechowywanych w chmurowym magazynie obiektów. Zobacz Co to jest moduł automatycznego ładowania?.
SQL używa funkcji read_files do uruchamiania funkcji Auto Loader. Musisz również użyć słowa kluczowego STREAM, aby skonfigurować odczyt strumieniowy za pomocą read_files.
Poniżej opisano składnię dla read_files języka SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Opcje automatycznego modułu ładującego to pary klucz-wartość. Aby uzyskać szczegółowe informacje na temat obsługiwanych formatów i opcji, zobacz opcje .
W poniższym przykładzie tworzona jest tabela strumieniowa z plików JSON przy użyciu Auto Loader.
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Funkcja read_files obsługuje również semantyki wsadowe w celu tworzenia zmaterializowanych widoków. W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Walidacja danych względem oczekiwań
Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań dotyczących przepływu danych.
Poniższy kod definiuje oczekiwania o nazwie valid_data, które odrzucają rekordy o wartości null podczas pozyskiwania danych:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Wykonywanie zapytań dotyczących zmaterializowanych widoków i tabel strumieniowych zdefiniowanych w potoku
W poniższym przykładzie zdefiniowano cztery zestawy danych:
- Tabela strumieniowa nazwana
orders, która ładuje dane JSON. - Zmaterializowany widok o nazwie
customers, który ładuje dane CSV. - Zmaterializowany widok o nazwie
customer_orders, łączący rekordy z zestawów danychordersicustomers, przekształca znacznik czasu zamówienia na datę oraz wybiera polacustomer_id,order_number,stateiorder_date. - Zmaterializowany widok o nazwie
daily_orders_by_state, który agreguje dzienną liczbę zamówień dla każdego stanu.
Uwaga / Notatka
Podczas zapytań dotyczących widoków lub tabel w potoku danych, można określić katalog i schemat bezpośrednio lub użyć wartości domyślnych skonfigurowanych w potoku danych. W tym przykładzie tabele orders, customers i customer_orders są zapisywane i odczytywane z domyślnego wykazu i schematu skonfigurowanego dla przepływu danych.
Tryb publikowania Legacy używa schematu LIVE do wykonywania zapytań dotyczących innych zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku. W nowych potokach składnia schematu LIVE jest ignorowana w trybie dyskretnym. Zobacz LIVE schema (legacy).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Definiowanie prywatnej tabeli
Klauzulę PRIVATE można użyć podczas tworzenia zmaterializowanego widoku lub tabeli strumieniowej. Podczas tworzenia tabeli prywatnej tworzysz tabelę, ale nie tworzysz metadanych dla tabeli. Klauzula PRIVATE nakazuje, aby SDP utworzył tabelę, która jest dostępna dla potoku, ale nie powinna być dostępna poza potokiem. Aby skrócić czas przetwarzania, prywatna tabela utrzymuje się przez czas istnienia potoku, który ją tworzy, a nie tylko dla pojedynczej aktualizacji.
Tabele prywatne mogą mieć taką samą nazwę jak tabele w wykazie. Jeśli określisz niekwalifikowaną nazwę tabeli w potoku, jeśli istnieje zarówno tabela prywatna, jak i tabela wykazu o tej nazwie, zostanie użyta prywatna tabela.
Tabele prywatne były wcześniej nazywane tabelami tymczasowymi.
Trwale usuń rekordy z zmaterializowanego widoku lub tabeli strumieniowej
Aby trwale usunąć rekordy z tabeli przesyłania strumieniowego, w której włączono wektory usuwania z powodów takich jak zgodność z RODO, należy wykonać dodatkowe operacje na podstawowych tabelach delty obiektu. Aby zapewnić usunięcie rekordów z tabeli przesyłania strumieniowego, zobacz Trwałe usuwanie rekordów z tabeli przesyłania strumieniowego.
Zmaterializowane widoki zawsze odzwierciedlają dane w tabelach bazowych po ich odświeżeniu. Aby usunąć dane w zmaterializowanym widoku, musisz usunąć dane ze źródła i odświeżyć zmaterializowany widok.
Parametryzowanie wartości używanych podczas deklarowania tabel lub widoków przy użyciu języka SQL
Użyj SET, aby określić wartość konfiguracji w zapytaniu, które deklaruje tabelę lub widok, w tym konfiguracje platformy Spark. Każda tabela lub widok zdefiniowany w pliku źródłowym po SET instrukcji ma dostęp do zdefiniowanej wartości. Wszystkie konfiguracje platformy Spark określone przy użyciu instrukcji SET są używane podczas wykonywania zapytania Spark dla dowolnej tabeli lub widoku zgodnie z instrukcją SET. Aby odczytać wartość konfiguracji w zapytaniu, użyj składni interpolacji ciągów ${}. W poniższym przykładzie ustawiono wartość konfiguracji platformy Spark o nazwie startDate i użyto tej wartości w zapytaniu:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Aby określić wiele wartości konfiguracji, użyj oddzielnej instrukcji SET dla każdej wartości.
Ograniczenia
Klauzula PIVOT nie jest obsługiwana. Operacja pivot na platformie Spark wymaga załadowania danych wejściowych, aby obliczyć schemat wyjściowy. Ta funkcja nie jest obsługiwana w potokach.
Uwaga / Notatka
Składnia CREATE OR REFRESH LIVE TABLE umożliwiająca utworzenie zmaterializowanego widoku jest przestarzała. Zamiast tego użyj CREATE OR REFRESH MATERIALIZED VIEW.