Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Databricks rekomenduje używanie tabel strumieniowych do ładowania danych przy użyciu Databricks SQL. Tabela przesyłania strumieniowego to tabela zarejestrowana w Katalogu Unity z dodatkową obsługą przesyłania strumieniowego lub przetwarzania danych przyrostowych. Dla każdej tabeli przesyłania strumieniowego automatycznie tworzony jest potok danych. Można używać tabel przesyłania strumieniowego do przyrostowego ładowania danych z systemu Kafka oraz magazynów obiektów w chmurze.
Uwaga / Notatka
Aby dowiedzieć się, jak używać tabel usługi Delta Lake jako źródeł i odbiorników przesyłania strumieniowego, zobacz artykuł Delta table streaming reads and writes.
Wymagania
Aby korzystać z tabel przesyłania strumieniowego, musisz spełnić następujące wymagania.
Wymagania dotyczące obszaru roboczego:
Tabele przesyłania strumieniowego utworzone w usłudze Databricks SQL są wspierane przez bezserwerowe potoki deklaratywne Lakeflow. Wasz obszar roboczy musi obsługiwać potoki bezserwerowe, aby wykorzystać tę funkcję.
- Konto usługi Azure Databricks z włączonym działaniem bezserwerowym. Aby uzyskać więcej informacji, zobacz Włączanie bezserwerowych magazynów SQL.
- Obszar roboczy z włączonym Unity Catalog. Aby uzyskać więcej informacji, zobacz Wprowadzenie do Unity Catalog.
wymagania dotyczące obliczeń:
Należy użyć jednej z następujących opcji:
- Magazyn SQL korzystający z kanału
Current
. - Obliczenia ze standardowym trybem dostępu (dawniej trybem dostępu współdzielonego) na środowisku Databricks Runtime 13.3 LTS lub nowszym.
Korzystaj z dedykowanego trybu dostępu (dawniej tryb dostępu pojedynczego użytkownika) na Databricks Runtime 15.4 LTS lub nowszym.
W środowisku Databricks Runtime 15.3 lub nowszym nie można używać dedykowanych obliczeń do wykonywania zapytań dotyczących tabel przesyłania strumieniowego należących do innych użytkowników. Możesz użyć dedykowanych zasobów obliczeniowych w środowisku Databricks Runtime 15.3 i nowszym tylko wtedy, gdy jesteś właścicielem tabeli przesyłania strumieniowego. Stwórcą tabeli jest właściciel.
Środowisko Databricks Runtime w wersji 15.4 LTS i nowsze obsługuje możliwość wykonywania zapytań do tabel generowanych przez Lakeflow Declarative Pipelines na dedykowanej infrastrukturze obliczeniowej, nawet jeśli nie jesteś ich właścicielem. Opłaty mogą być naliczane za zasoby obliczeniowe bezserwerowe w przypadku korzystania z dedykowanych zasobów obliczeniowych do uruchamiania operacji filtrowania danych. Zobacz Szczegółową kontrolę dostępu na dedykowanej jednostce obliczeniowej.
Wymagania dotyczące uprawnień:
-
USE CATALOG
iUSE SCHEMA
uprawnienia na katalog i schemat, w którym tworzysz tabelę przesyłania strumieniowego. - Uprawnienie
CREATE TABLE
w schemacie, w którym tworzysz tabelę przesyłania strumieniowego. - Uprawnienia dostępu do tabel lub lokalizacji dostarczających dane źródłowe dla tabeli przesyłania strumieniowego.
Tworzenie tabel przesyłania strumieniowego
Tabela strumieniowa jest definiowana przez zapytanie SQL w usłudze Databricks SQL. Podczas tworzenia tabeli strumieniowej dane w tabelach źródłowych są używane do budowania tabeli strumieniowej. Następnie odświeżasz tabelę, zwykle zgodnie z harmonogramem, aby ściągnąć wszystkie dodane dane w tabelach źródłowych, aby dołączyć je do tabeli przesyłania strumieniowego.
Podczas tworzenia tabeli przesyłania strumieniowego jesteś właścicielem tabeli.
Aby utworzyć tabelę streamingową na podstawie istniejącej tabeli, użyj CREATE STREAMING TABLE instrukcji, jak w poniższym przykładzie.
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;
W takim przypadku tabela sales
przesyłania strumieniowego jest tworzona na podstawie określonych kolumn raw_data
tabeli z harmonogramem odświeżania co godzinę. Używane zapytanie musi być zapytaniem przesyłanym strumieniowo . Użyj słowa kluczowego STREAM
, aby użyć semantyki przesyłania strumieniowego do odczytu ze źródła.
Podczas tworzenia tabeli przesyłania strumieniowego przy użyciu instrukcji CREATE OR REFRESH STREAMING TABLE
początkowe odświeżanie danych i populacja zaczynają się natychmiast. Te operacje nie korzystają z obliczeń magazynu DBSQL. Zamiast tego, tabela strumieniowa opiera się na bezserwerowych Deklaratywnych Potokach Lakeflow do tworzenia i odświeżania. Dedykowany potok bezserwerowy jest automatycznie tworzony i zarządzany przez system dla każdej tabeli przesyłania strumieniowego.
Ładowanie plików za pomocą modułu ładującego automatycznego
Aby utworzyć tabelę przesyłania strumieniowego na podstawie plików w woluminie, należy użyć modułu automatycznego ładowania. Użyj Auto Loader z potokami Deklaratywnymi Lakeflow, do większości zadań związanych z pozyskiwaniem danych z chmurowego magazynu obiektów. Auto Loader i Deklaratywne Potoki Lakeflow są zaprojektowane do przyrostowego i idempotentnego ładowania danych, które stale rosną, w miarę jak napływają do magazynu w chmurze.
Aby użyć automatycznego modułu read_files
ładującego w usłudze Databricks SQL, użyj funkcji . W poniższym przykładzie pokazano użycie Auto Loader do odczytu serii plików JSON do tabeli strumieniowej.
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);
Aby odczytać dane z magazynu w chmurze, możesz również użyć modułu automatycznego ładowania:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Aby dowiedzieć się więcej na temat automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?. Aby dowiedzieć się więcej na temat używania automatycznego modułu ładującego w języku SQL, z przykładami, zobacz Ładowanie danych z magazynu obiektów.
Strumieniowe pobieranie danych z innych źródeł
Na przykład zobacz Ładowanie danych za pomocą potoków deklaratywnych usługi Lakeflow w kontekście pozyskiwania z innych źródeł, w tym z systemu Kafka.
Tylko wprowadzanie nowych danych
Domyślnie funkcja read_files
odczytuje wszystkie istniejące dane w katalogu źródłowym podczas tworzenia tabeli, a następnie przetwarza nowo przybywające rekordy przy każdym odświeżeniu.
Aby uniknąć pozyskiwania danych, które już istnieją w katalogu źródłowym w momencie tworzenia tabeli, ustaw opcję includeExistingFiles
na false
. Oznacza to, że tylko dane, które docierają do katalogu po utworzeniu tabeli, są przetwarzane. Przykład:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);
Ustaw kanał środowiska uruchomieniowego
Tabele przesyłania strumieniowego utworzone przy użyciu usługi SQL Warehouse są automatycznie odświeżane za pomocą potoku. Potoki deklaratywne Lakeflow domyślnie używają środowiska uruchomieniowego w kanale current
. Zobacz informacje o wersji Lakeflow Deklaratywnych Potoków i procedurę aktualizacji wersji, aby zapoznać się z procesem wydawania.
Databricks rekomenduje użycie kanału current
dla obciążeń produkcyjnych. Nowe funkcje są po raz pierwszy udostępniane w preview
kanale. Potok można ustawić na kanał Lakeflow Potoki Deklaratywne w wersji próbnej, aby przetestować nowe funkcje, określając preview
jako właściwość tabeli. Tę właściwość można określić podczas tworzenia tabeli lub po utworzeniu tabeli przy użyciu instrukcji ALTER.
W poniższym przykładzie kodu pokazano, jak ustawić kanał na podgląd w instrukcji CREATE:
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;
Ukrywanie poufnych danych
Ważne
Ta funkcja jest dostępna w publicznej wersji testowej.
Możesz używać tabel przesyłania strumieniowego, aby ukryć poufne dane przed użytkownikami uzyskującymi dostęp do tabeli. Jednym z podejść jest zdefiniowanie zapytania tak, aby całkowicie wykluczało poufne kolumny lub wiersze. Alternatywnie można stosować maski kolumn lub filtry wierszy na podstawie uprawnień użytkownika kwerendy. Można na przykład ukryć kolumnę tax_id
dla użytkowników, którzy nie znajdują się w grupie HumanResourcesDept
. Do tego użyj składni ROW FILTER
oraz MASK
podczas tworzenia tabeli przesyłania strumieniowego. Aby uzyskać więcej informacji, zobacz Filtry wierszy i maski kolumn.
Odświeżanie tabeli przesyłania strumieniowego
Można odświeżanie zaplanować automatycznie podczas tworzenia tabeli przesyłania strumieniowego. Możesz również ręcznie odświeżyć tabele przesyłania strumieniowego. Nawet jeśli masz zaplanowane odświeżanie, możesz wywołać odświeżanie ręczne w dowolnym momencie. Operacje odświeżania są obsługiwane przez ten sam potok, który został automatycznie utworzony wraz z tabelą przesyłania strumieniowego.
Aby zaktualizować tabelę streamingową:
REFRESH STREAMING TABLE sales;
Stan najnowszego odświeżania można sprawdzić za pomocą DESCRIBE TABLE EXTENDED.
Uwaga / Notatka
Tylko właściciel tabeli może odświeżyć tabelę przesyłaną strumieniowo, aby uzyskać najnowsze dane. Użytkownik tworzący tabelę jest właścicielem i nie można zmienić właściciela. Może być konieczne odświeżenie tabeli przesyłania strumieniowego przed użyciem zapytań podróży w czasie.
Jak działa odświeżanie
Odświeżanie tabeli przesyłania strumieniowego ocenia tylko nowe wiersze, które dotarły od ostatniej aktualizacji i dołącza tylko nowe dane.
Każde odświeżanie używa bieżącej definicji tabeli przesyłania strumieniowego do przetwarzania tych nowych danych. Modyfikowanie definicji tabeli przesyłania strumieniowego nie powoduje automatycznego ponownego obliczania istniejących danych. Jeśli modyfikacja jest niezgodna z istniejącymi danymi (np. zmiana typu danych), następne odświeżenie zakończy się niepowodzeniem z powodu błędu.
W poniższych przykładach wyjaśniono, jak zmiany definicji tabeli przesyłania strumieniowego wpływają na zachowanie odświeżania:
- Usunięcie filtru nie spowoduje ponownego przetworzenia poprzednio filtrowanych wierszy.
- Zmiana projekcji kolumn nie wpłynie na sposób przetwarzania istniejących danych.
- Połączenia ze statycznymi migawkami używają stanu migawki z momentu początkowego przetwarzania. Dane, które zostałyby dopasowane z zaktualizowaną migawką, zostaną zignorowane. Może to prowadzić do pomijania faktów, jeśli wymiary są opóźnione.
- Zmodyfikowanie rzutowania istniejącej kolumny spowoduje wystąpienie błędu.
Jeśli dane zmienią się w sposób, który nie może być obsługiwany w istniejącej tabeli przesyłania strumieniowego, możesz wykonać pełne odświeżanie.
W pełni odśwież tabelę transmisji strumieniowej
Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne odświeżanie obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.
Przykład:
REFRESH STREAMING TABLE sales FULL;
Zmień harmonogram tabeli przesyłanej strumieniowo
Możesz zmodyfikować (lub ustawić) harmonogram automatycznego odświeżania dla tabeli przesyłania strumieniowego. W poniższych przykładach pokazano, jak ustawić harmonogram przy użyciu polecenia ALTER STREAMING TABLE:
ALTER STREAMING TABLE sales
ADD SCHEDULE every 1 hour;
Na przykład, aby zobaczyć zapytania dotyczące harmonogramu odświeżania, sprawdź ALTER STREAMING TABLE.
Śledź stan odświeżania
Stan odświeżania tabeli przesyłania strumieniowego można wyświetlić, wyświetlając potok, który zarządza tabelą przesyłania strumieniowego w interfejsie użytkownika potoków deklaratywnych lakeflow lub wyświetlając informacje odświeżania zwrócone przez DESCRIBE EXTENDED
polecenie dla tabeli przesyłania strumieniowego.
DESCRIBE TABLE EXTENDED <table-name>;
Alternatywnie możesz wyświetlić tabelę przesyłania strumieniowego w Eksploratorze wykazu i wyświetlić tam stan odświeżania:
- Kliknij
Wykaz na pasku bocznym.
- W drzewie Eksploratora wykazu po lewej stronie otwórz katalog i wybierz schemat, w którym znajduje się tabela przesyłania strumieniowego.
- Otwórz element Tabele w wybranym schemacie, a następnie kliknij tabelę przesyłania strumieniowego.
W tym miejscu możesz użyć kart w nazwie tabeli przesyłania strumieniowego, aby wyświetlić i edytować informacje o tabeli przesyłania strumieniowego, w tym:
- Odświeżanie stanu i historii
- Schemat tabeli
- Przykładowe dane (wymagają aktywnego obliczenia)
- Uprawnienia
- Pochodzenie danych, w tym tabele i potoki, od których zależy ta tabela strumieniowa
- Szczegółowe informacje o użyciu
- Monitory, które utworzyłeś dla tej tabeli przesyłania strumieniowego
Kontrolowanie dostępu do tabel przesyłania strumieniowego
Tabele przesyłania strumieniowego obsługują zaawansowane mechanizmy kontroli dostępu, aby wspierać udostępnianie danych, jednocześnie unikając ujawniania potencjalnie prywatnych danych. Właściciel tabeli przesyłania strumieniowego lub użytkownik z MANAGE
uprawnieniami może udzielić SELECT
uprawnień innym użytkownikom. Użytkownicy z dostępem SELECT
do tabeli przesyłania strumieniowego nie potrzebują SELECT
dostępu do tabel, do których odwołuje się tabela przesyłania strumieniowego. Ta kontrola dostępu umożliwia udostępnianie danych podczas kontrolowania dostępu do danych bazowych.
Przyznaj uprawnienia do tabeli streamingowej
Aby udzielić dostępu do tabeli przesyłania strumieniowego, użyj instrukcjiGRANT :
GRANT <privilege_type> ON <st_name> TO <principal>;
privilege_type
może być:
-
SELECT
— użytkownik może przesyłaćSELECT
tabelę przesyłania strumieniowego. -
REFRESH
— użytkownik może przesyłaćREFRESH
tabelę przesyłania strumieniowego. Operacje odświeżania są uruchamiane przy użyciu uprawnień właściciela.
Poniższy przykład tworzy tabelę przesyłania strumieniowego i przyznaje użytkownikom uprawnienia wyboru i odświeżania:
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;
-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;
-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;
Aby uzyskać więcej informacji na temat udzielania uprawnień do obiektów zabezpieczalnych Unity Catalogu, zobacz temat uprawnienia Unity Catalogu i obiekty zabezpieczalne.
Odwoływanie uprawnień z tabeli przesyłania strumieniowego
Aby odwołać dostęp z tabeli przesyłania strumieniowego, użyj instrukcjiREVOKE :
REVOKE privilege_type ON <st_name> FROM principal;
Jeśli uprawnienia SELECT
w tabeli źródłowej zostaną odwołane od właściciela tabeli przesyłania strumieniowego lub dowolnego innego użytkownika, który otrzymał uprawnienia MANAGE
lub SELECT
do tabeli przesyłania strumieniowego, albo tabela źródłowa zostanie usunięta, właściciel tabeli przesyłania strumieniowego lub użytkownik, któremu udzielono dostępu, nadal będą mogli wykonywać zapytania na tabeli przesyłania strumieniowego. Jednak występuje następujące zachowanie:
- Właściciel tabeli przesyłania strumieniowego lub inne osoby, które utraciły dostęp do tabeli przesyłania strumieniowego, nie mogą już
REFRESH
tej tabeli przesyłania strumieniowego, a tabela przesyłania strumieniowego stanie się nieaktualna. - Jeśli jest zautomatyzowane zgodnie z harmonogramem, kolejna zaplanowana
REFRESH
nie powiedzie się lub nie zostanie uruchomiona.
Poniższy przykład odbiera SELECT
uprawnienie dla read_only_user
.
REVOKE SELECT ON st_name FROM read_only_user;
Trwale usunąć rekordy z tabeli przesyłania strumieniowego
Ważne
Obsługa instrukcji REORG
z tabelami przesyłania strumieniowego jest dostępna w publicznej wersji zapoznawczej.
Uwaga / Notatka
- Użycie instrukcji
REORG
z tabelą strumieniową wymaga środowiska Databricks Runtime w wersji 15.4 lub nowszej. - Chociaż można użyć instrukcji
REORG
z dowolną tabelą przesyłania strumieniowego, jest ona wymagana tylko podczas usuwania rekordów z tabeli przesyłania strumieniowego z włączonymi wektorami usuwania . Polecenie nie ma wpływu po użyciu z tabelą przesyłania strumieniowego bez włączonych wektorów usuwania.
Aby fizycznie usunąć rekordy z magazynu bazowego dla tabeli przesyłania strumieniowego z włączonymi wektorami usuwania, takimi jak zgodność z RODO, należy wykonać dodatkowe kroki, aby upewnić się, że VACUUM operacja jest uruchamiana na danych tabeli przesyłania strumieniowego.
Aby fizycznie usunąć rekordy z magazynu bazowego:
- Zaktualizuj rekordy lub usuń rekordy z tabeli przesyłania strumieniowego.
- Wykonaj instrukcję
REORG
względem tabeli strumieniowej, określając parametrAPPLY (PURGE)
. Na przykład:REORG TABLE <streaming-table-name> APPLY (PURGE);
. - Poczekaj, aż minie okres przechowywania danych w tabeli przesyłania strumieniowego. Domyślny okres przechowywania danych wynosi siedem dni, ale można go skonfigurować za pomocą właściwości tabeli
delta.deletedFileRetentionDuration
. Zobacz Konfigurowanie przechowywania danych dla zapytań dotyczących podróży w czasie. -
REFRESH
tabela przesyłania strumieniowego. Zobacz Jak odświeżyć tabelę przesyłania strumieniowego. W ciągu 24 godzin odREFRESH
operacji zadania konserwacji potoków deklaratywnych Lakeflow, w tym operacji wymaganejVACUUM
do zapewnienia trwałego usunięcia rekordów, są uruchamiane automatycznie.
Monitorowanie przebiegów przy użyciu historii zapytań
Możesz użyć strony historii zapytań, aby uzyskać dostęp do szczegółów zapytań i profilów zapytań, które mogą pomóc w identyfikowaniu słabych zapytań i wąskich gardeł w Deklaratywnych Potokach Lakeflow używanych do uruchamiania aktualizacji tabel przesyłanych strumieniowo. Aby zapoznać się z omówieniem rodzaju informacji dostępnych w historiach zapytań i profilach zapytań, zobacz Historia zapytań i Profil zapytania.
Ważne
Ta funkcja jest dostępna w publicznej wersji testowej. Administratorzy obszaru roboczego mogą włączyć tę funkcję na stronie Podglądy . Zobacz Zarządzanie wersjami zapoznawczymi usługi Azure Databricks.
Wszystkie instrukcje związane z tabelami strumieniowymi występują w historii zapytań. Możesz użyć rozwijanej listy filtrującej Statement, aby wybrać dowolne polecenie i przejrzeć powiązane zapytania. Wszystkie instrukcje CREATE
są kontynuowane przez instrukcję REFRESH
, która jest wykonywana asynchronicznie w potoku. Instrukcje REFRESH
zwykle zawierają szczegółowe plany zapytań, które zapewniają wgląd w optymalizację wydajności.
Aby uzyskać dostęp do REFRESH
zapytań w historii zapytań interfejsu użytkownika, wykonaj następujące kroki:
- Kliknij
na lewym pasku bocznym, aby otworzyć interfejs użytkownika historii zapytań .
- Zaznacz pole wyboru REFRESH z rozwijanego filtra Oświadczenia.
- Kliknij nazwę instrukcji zapytania, aby wyświetlić szczegóły podsumowania, takie jak czas trwania zapytania i zagregowane metryki.
- Kliknij pozycję Zobacz profil zapytania , aby otworzyć profil zapytania. Aby uzyskać szczegółowe informacje na temat nawigowania w profilu zapytania, zobacz Profil zapytania.
- Możesz użyć linków w sekcji Źródło zapytania, aby otworzyć powiązane zapytanie lub potok danych opcjonalnie.
Możesz również uzyskać dostęp do szczegółów zapytania przy użyciu linków w edytorze SQL lub notesu dołączonego do usługi SQL Warehouse.