Udostępnij za pośrednictwem


Używanie tabel przesyłania strumieniowego w usłudze Databricks SQL

Databricks rekomenduje używanie tabel strumieniowych do ładowania danych przy użyciu Databricks SQL. Tabela przesyłania strumieniowego to tabela zarejestrowana w Katalogu Unity z dodatkową obsługą przesyłania strumieniowego lub przetwarzania danych przyrostowych. Dla każdej tabeli przesyłania strumieniowego automatycznie tworzony jest potok danych. Można używać tabel przesyłania strumieniowego do przyrostowego ładowania danych z systemu Kafka oraz magazynów obiektów w chmurze.

Uwaga / Notatka

Aby dowiedzieć się, jak używać tabel usługi Delta Lake jako źródeł i odbiorników przesyłania strumieniowego, zobacz artykuł Delta table streaming reads and writes.

Wymagania

Aby korzystać z tabel przesyłania strumieniowego, musisz spełnić następujące wymagania.

Wymagania dotyczące obszaru roboczego:

Tabele przesyłania strumieniowego utworzone w usłudze Databricks SQL są wspierane przez bezserwerowe potoki deklaratywne Lakeflow. Wasz obszar roboczy musi obsługiwać potoki bezserwerowe, aby wykorzystać tę funkcję.

wymagania dotyczące obliczeń:

Należy użyć jednej z następujących opcji:

  • Magazyn SQL korzystający z kanału Current.
  • Obliczenia ze standardowym trybem dostępu (dawniej trybem dostępu współdzielonego) na środowisku Databricks Runtime 13.3 LTS lub nowszym.
  • Korzystaj z dedykowanego trybu dostępu (dawniej tryb dostępu pojedynczego użytkownika) na Databricks Runtime 15.4 LTS lub nowszym.

    W środowisku Databricks Runtime 15.3 lub nowszym nie można używać dedykowanych obliczeń do wykonywania zapytań dotyczących tabel przesyłania strumieniowego należących do innych użytkowników. Możesz użyć dedykowanych zasobów obliczeniowych w środowisku Databricks Runtime 15.3 i nowszym tylko wtedy, gdy jesteś właścicielem tabeli przesyłania strumieniowego. Stwórcą tabeli jest właściciel.

    Środowisko Databricks Runtime w wersji 15.4 LTS i nowsze obsługuje możliwość wykonywania zapytań do tabel generowanych przez Lakeflow Declarative Pipelines na dedykowanej infrastrukturze obliczeniowej, nawet jeśli nie jesteś ich właścicielem. Opłaty mogą być naliczane za zasoby obliczeniowe bezserwerowe w przypadku korzystania z dedykowanych zasobów obliczeniowych do uruchamiania operacji filtrowania danych. Zobacz Szczegółową kontrolę dostępu na dedykowanej jednostce obliczeniowej.

Wymagania dotyczące uprawnień:

  • USE CATALOG i USE SCHEMA uprawnienia na katalog i schemat, w którym tworzysz tabelę przesyłania strumieniowego.
  • Uprawnienie CREATE TABLE w schemacie, w którym tworzysz tabelę przesyłania strumieniowego.
  • Uprawnienia dostępu do tabel lub lokalizacji dostarczających dane źródłowe dla tabeli przesyłania strumieniowego.

Tworzenie tabel przesyłania strumieniowego

Tabela strumieniowa jest definiowana przez zapytanie SQL w usłudze Databricks SQL. Podczas tworzenia tabeli strumieniowej dane w tabelach źródłowych są używane do budowania tabeli strumieniowej. Następnie odświeżasz tabelę, zwykle zgodnie z harmonogramem, aby ściągnąć wszystkie dodane dane w tabelach źródłowych, aby dołączyć je do tabeli przesyłania strumieniowego.

Podczas tworzenia tabeli przesyłania strumieniowego jesteś właścicielem tabeli.

Aby utworzyć tabelę streamingową na podstawie istniejącej tabeli, użyj CREATE STREAMING TABLE instrukcji, jak w poniższym przykładzie.

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT product, price FROM STREAM raw_data;

W takim przypadku tabela sales przesyłania strumieniowego jest tworzona na podstawie określonych kolumn raw_data tabeli z harmonogramem odświeżania co godzinę. Używane zapytanie musi być zapytaniem przesyłanym strumieniowo . Użyj słowa kluczowego STREAM , aby użyć semantyki przesyłania strumieniowego do odczytu ze źródła.

Podczas tworzenia tabeli przesyłania strumieniowego przy użyciu instrukcji CREATE OR REFRESH STREAMING TABLE początkowe odświeżanie danych i populacja zaczynają się natychmiast. Te operacje nie korzystają z obliczeń magazynu DBSQL. Zamiast tego, tabela strumieniowa opiera się na bezserwerowych Deklaratywnych Potokach Lakeflow do tworzenia i odświeżania. Dedykowany potok bezserwerowy jest automatycznie tworzony i zarządzany przez system dla każdej tabeli przesyłania strumieniowego.

Ładowanie plików za pomocą modułu ładującego automatycznego

Aby utworzyć tabelę przesyłania strumieniowego na podstawie plików w woluminie, należy użyć modułu automatycznego ładowania. Użyj Auto Loader z potokami Deklaratywnymi Lakeflow, do większości zadań związanych z pozyskiwaniem danych z chmurowego magazynu obiektów. Auto Loader i Deklaratywne Potoki Lakeflow są zaprojektowane do przyrostowego i idempotentnego ładowania danych, które stale rosną, w miarę jak napływają do magazynu w chmurze.

Aby użyć automatycznego modułu read_files ładującego w usłudze Databricks SQL, użyj funkcji . W poniższym przykładzie pokazano użycie Auto Loader do odczytu serii plików JSON do tabeli strumieniowej.

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT * FROM STREAM read_files(
    "/Volumes/my_catalog/my_schema/my_volume/path/to/data",
    format => "json"
  );

Aby odczytać dane z magazynu w chmurze, możesz również użyć modułu automatycznego ładowania:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Aby dowiedzieć się więcej na temat automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?. Aby dowiedzieć się więcej na temat używania automatycznego modułu ładującego w języku SQL, z przykładami, zobacz Ładowanie danych z magazynu obiektów.

Strumieniowe pobieranie danych z innych źródeł

Na przykład zobacz Ładowanie danych za pomocą potoków deklaratywnych usługi Lakeflow w kontekście pozyskiwania z innych źródeł, w tym z systemu Kafka.

Tylko wprowadzanie nowych danych

Domyślnie funkcja read_files odczytuje wszystkie istniejące dane w katalogu źródłowym podczas tworzenia tabeli, a następnie przetwarza nowo przybywające rekordy przy każdym odświeżeniu.

Aby uniknąć pozyskiwania danych, które już istnieją w katalogu źródłowym w momencie tworzenia tabeli, ustaw opcję includeExistingFiles na false. Oznacza to, że tylko dane, które docierają do katalogu po utworzeniu tabeli, są przetwarzane. Przykład:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    '/path/to/files',
    includeExistingFiles => false
  );

Ustaw kanał środowiska uruchomieniowego

Tabele przesyłania strumieniowego utworzone przy użyciu usługi SQL Warehouse są automatycznie odświeżane za pomocą potoku. Potoki deklaratywne Lakeflow domyślnie używają środowiska uruchomieniowego w kanale current. Zobacz informacje o wersji Lakeflow Deklaratywnych Potoków i procedurę aktualizacji wersji, aby zapoznać się z procesem wydawania.

Databricks rekomenduje użycie kanału current dla obciążeń produkcyjnych. Nowe funkcje są po raz pierwszy udostępniane w preview kanale. Potok można ustawić na kanał Lakeflow Potoki Deklaratywne w wersji próbnej, aby przetestować nowe funkcje, określając preview jako właściwość tabeli. Tę właściwość można określić podczas tworzenia tabeli lub po utworzeniu tabeli przy użyciu instrukcji ALTER.

W poniższym przykładzie kodu pokazano, jak ustawić kanał na podgląd w instrukcji CREATE:

CREATE OR REFRESH STREAMING TABLE sales
  TBLPROPERTIES ('pipelines.channel' = 'preview')
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM raw_data;

Ukrywanie poufnych danych

Ważne

Ta funkcja jest dostępna w publicznej wersji testowej.

Możesz używać tabel przesyłania strumieniowego, aby ukryć poufne dane przed użytkownikami uzyskującymi dostęp do tabeli. Jednym z podejść jest zdefiniowanie zapytania tak, aby całkowicie wykluczało poufne kolumny lub wiersze. Alternatywnie można stosować maski kolumn lub filtry wierszy na podstawie uprawnień użytkownika kwerendy. Można na przykład ukryć kolumnę tax_id dla użytkowników, którzy nie znajdują się w grupie HumanResourcesDept. Do tego użyj składni ROW FILTER oraz MASK podczas tworzenia tabeli przesyłania strumieniowego. Aby uzyskać więcej informacji, zobacz Filtry wierszy i maski kolumn.

Odświeżanie tabeli przesyłania strumieniowego

Można odświeżanie zaplanować automatycznie podczas tworzenia tabeli przesyłania strumieniowego. Możesz również ręcznie odświeżyć tabele przesyłania strumieniowego. Nawet jeśli masz zaplanowane odświeżanie, możesz wywołać odświeżanie ręczne w dowolnym momencie. Operacje odświeżania są obsługiwane przez ten sam potok, który został automatycznie utworzony wraz z tabelą przesyłania strumieniowego.

Aby zaktualizować tabelę streamingową:

REFRESH STREAMING TABLE sales;

Stan najnowszego odświeżania można sprawdzić za pomocą DESCRIBE TABLE EXTENDED.

Uwaga / Notatka

Tylko właściciel tabeli może odświeżyć tabelę przesyłaną strumieniowo, aby uzyskać najnowsze dane. Użytkownik tworzący tabelę jest właścicielem i nie można zmienić właściciela. Może być konieczne odświeżenie tabeli przesyłania strumieniowego przed użyciem zapytań podróży w czasie.

Jak działa odświeżanie

Odświeżanie tabeli przesyłania strumieniowego ocenia tylko nowe wiersze, które dotarły od ostatniej aktualizacji i dołącza tylko nowe dane.

Każde odświeżanie używa bieżącej definicji tabeli przesyłania strumieniowego do przetwarzania tych nowych danych. Modyfikowanie definicji tabeli przesyłania strumieniowego nie powoduje automatycznego ponownego obliczania istniejących danych. Jeśli modyfikacja jest niezgodna z istniejącymi danymi (np. zmiana typu danych), następne odświeżenie zakończy się niepowodzeniem z powodu błędu.

W poniższych przykładach wyjaśniono, jak zmiany definicji tabeli przesyłania strumieniowego wpływają na zachowanie odświeżania:

  • Usunięcie filtru nie spowoduje ponownego przetworzenia poprzednio filtrowanych wierszy.
  • Zmiana projekcji kolumn nie wpłynie na sposób przetwarzania istniejących danych.
  • Połączenia ze statycznymi migawkami używają stanu migawki z momentu początkowego przetwarzania. Dane, które zostałyby dopasowane z zaktualizowaną migawką, zostaną zignorowane. Może to prowadzić do pomijania faktów, jeśli wymiary są opóźnione.
  • Zmodyfikowanie rzutowania istniejącej kolumny spowoduje wystąpienie błędu.

Jeśli dane zmienią się w sposób, który nie może być obsługiwany w istniejącej tabeli przesyłania strumieniowego, możesz wykonać pełne odświeżanie.

W pełni odśwież tabelę transmisji strumieniowej

Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne odświeżanie obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.

Przykład:

REFRESH STREAMING TABLE sales FULL;

Zmień harmonogram tabeli przesyłanej strumieniowo

Możesz zmodyfikować (lub ustawić) harmonogram automatycznego odświeżania dla tabeli przesyłania strumieniowego. W poniższych przykładach pokazano, jak ustawić harmonogram przy użyciu polecenia ALTER STREAMING TABLE:

ALTER STREAMING TABLE sales
  ADD SCHEDULE every 1 hour;

Na przykład, aby zobaczyć zapytania dotyczące harmonogramu odświeżania, sprawdź ALTER STREAMING TABLE.

Śledź stan odświeżania

Stan odświeżania tabeli przesyłania strumieniowego można wyświetlić, wyświetlając potok, który zarządza tabelą przesyłania strumieniowego w interfejsie użytkownika potoków deklaratywnych lakeflow lub wyświetlając informacje odświeżania zwrócone przez DESCRIBE EXTENDED polecenie dla tabeli przesyłania strumieniowego.

DESCRIBE TABLE EXTENDED <table-name>;

Alternatywnie możesz wyświetlić tabelę przesyłania strumieniowego w Eksploratorze wykazu i wyświetlić tam stan odświeżania:

  1. Kliknij ikonę Dane.Wykaz na pasku bocznym.
  2. W drzewie Eksploratora wykazu po lewej stronie otwórz katalog i wybierz schemat, w którym znajduje się tabela przesyłania strumieniowego.
  3. Otwórz element Tabele w wybranym schemacie, a następnie kliknij tabelę przesyłania strumieniowego.

W tym miejscu możesz użyć kart w nazwie tabeli przesyłania strumieniowego, aby wyświetlić i edytować informacje o tabeli przesyłania strumieniowego, w tym:

  • Odświeżanie stanu i historii
  • Schemat tabeli
  • Przykładowe dane (wymagają aktywnego obliczenia)
  • Uprawnienia
  • Pochodzenie danych, w tym tabele i potoki, od których zależy ta tabela strumieniowa
  • Szczegółowe informacje o użyciu
  • Monitory, które utworzyłeś dla tej tabeli przesyłania strumieniowego

Kontrolowanie dostępu do tabel przesyłania strumieniowego

Tabele przesyłania strumieniowego obsługują zaawansowane mechanizmy kontroli dostępu, aby wspierać udostępnianie danych, jednocześnie unikając ujawniania potencjalnie prywatnych danych. Właściciel tabeli przesyłania strumieniowego lub użytkownik z MANAGE uprawnieniami może udzielić SELECT uprawnień innym użytkownikom. Użytkownicy z dostępem SELECT do tabeli przesyłania strumieniowego nie potrzebują SELECT dostępu do tabel, do których odwołuje się tabela przesyłania strumieniowego. Ta kontrola dostępu umożliwia udostępnianie danych podczas kontrolowania dostępu do danych bazowych.

Przyznaj uprawnienia do tabeli streamingowej

Aby udzielić dostępu do tabeli przesyłania strumieniowego, użyj instrukcjiGRANT :

GRANT <privilege_type> ON <st_name> TO <principal>;

privilege_type może być:

  • SELECT — użytkownik może przesyłać SELECT tabelę przesyłania strumieniowego.
  • REFRESH — użytkownik może przesyłać REFRESH tabelę przesyłania strumieniowego. Operacje odświeżania są uruchamiane przy użyciu uprawnień właściciela.

Poniższy przykład tworzy tabelę przesyłania strumieniowego i przyznaje użytkownikom uprawnienia wyboru i odświeżania:

CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;

-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;

-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;

Aby uzyskać więcej informacji na temat udzielania uprawnień do obiektów zabezpieczalnych Unity Catalogu, zobacz temat uprawnienia Unity Catalogu i obiekty zabezpieczalne.

Odwoływanie uprawnień z tabeli przesyłania strumieniowego

Aby odwołać dostęp z tabeli przesyłania strumieniowego, użyj instrukcjiREVOKE :

REVOKE privilege_type ON <st_name> FROM principal;

Jeśli uprawnienia SELECT w tabeli źródłowej zostaną odwołane od właściciela tabeli przesyłania strumieniowego lub dowolnego innego użytkownika, który otrzymał uprawnienia MANAGE lub SELECT do tabeli przesyłania strumieniowego, albo tabela źródłowa zostanie usunięta, właściciel tabeli przesyłania strumieniowego lub użytkownik, któremu udzielono dostępu, nadal będą mogli wykonywać zapytania na tabeli przesyłania strumieniowego. Jednak występuje następujące zachowanie:

  • Właściciel tabeli przesyłania strumieniowego lub inne osoby, które utraciły dostęp do tabeli przesyłania strumieniowego, nie mogą już REFRESH tej tabeli przesyłania strumieniowego, a tabela przesyłania strumieniowego stanie się nieaktualna.
  • Jeśli jest zautomatyzowane zgodnie z harmonogramem, kolejna zaplanowana REFRESH nie powiedzie się lub nie zostanie uruchomiona.

Poniższy przykład odbiera SELECT uprawnienie dla read_only_user.

REVOKE SELECT ON st_name FROM read_only_user;

Trwale usunąć rekordy z tabeli przesyłania strumieniowego

Ważne

Obsługa instrukcji REORG z tabelami przesyłania strumieniowego jest dostępna w publicznej wersji zapoznawczej.

Uwaga / Notatka

  • Użycie instrukcji REORG z tabelą strumieniową wymaga środowiska Databricks Runtime w wersji 15.4 lub nowszej.
  • Chociaż można użyć instrukcji REORG z dowolną tabelą przesyłania strumieniowego, jest ona wymagana tylko podczas usuwania rekordów z tabeli przesyłania strumieniowego z włączonymi wektorami usuwania . Polecenie nie ma wpływu po użyciu z tabelą przesyłania strumieniowego bez włączonych wektorów usuwania.

Aby fizycznie usunąć rekordy z magazynu bazowego dla tabeli przesyłania strumieniowego z włączonymi wektorami usuwania, takimi jak zgodność z RODO, należy wykonać dodatkowe kroki, aby upewnić się, że VACUUM operacja jest uruchamiana na danych tabeli przesyłania strumieniowego.

Aby fizycznie usunąć rekordy z magazynu bazowego:

  1. Zaktualizuj rekordy lub usuń rekordy z tabeli przesyłania strumieniowego.
  2. Wykonaj instrukcję REORG względem tabeli strumieniowej, określając parametr APPLY (PURGE). Na przykład: REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. Poczekaj, aż minie okres przechowywania danych w tabeli przesyłania strumieniowego. Domyślny okres przechowywania danych wynosi siedem dni, ale można go skonfigurować za pomocą właściwości tabeli delta.deletedFileRetentionDuration. Zobacz Konfigurowanie przechowywania danych dla zapytań dotyczących podróży w czasie.
  4. REFRESH tabela przesyłania strumieniowego. Zobacz Jak odświeżyć tabelę przesyłania strumieniowego. W ciągu 24 godzin od REFRESH operacji zadania konserwacji potoków deklaratywnych Lakeflow, w tym operacji wymaganej VACUUM do zapewnienia trwałego usunięcia rekordów, są uruchamiane automatycznie.

Monitorowanie przebiegów przy użyciu historii zapytań

Możesz użyć strony historii zapytań, aby uzyskać dostęp do szczegółów zapytań i profilów zapytań, które mogą pomóc w identyfikowaniu słabych zapytań i wąskich gardeł w Deklaratywnych Potokach Lakeflow używanych do uruchamiania aktualizacji tabel przesyłanych strumieniowo. Aby zapoznać się z omówieniem rodzaju informacji dostępnych w historiach zapytań i profilach zapytań, zobacz Historia zapytań i Profil zapytania.

Ważne

Ta funkcja jest dostępna w publicznej wersji testowej. Administratorzy obszaru roboczego mogą włączyć tę funkcję na stronie Podglądy . Zobacz Zarządzanie wersjami zapoznawczymi usługi Azure Databricks.

Wszystkie instrukcje związane z tabelami strumieniowymi występują w historii zapytań. Możesz użyć rozwijanej listy filtrującej Statement, aby wybrać dowolne polecenie i przejrzeć powiązane zapytania. Wszystkie instrukcje CREATE są kontynuowane przez instrukcję REFRESH, która jest wykonywana asynchronicznie w potoku. Instrukcje REFRESH zwykle zawierają szczegółowe plany zapytań, które zapewniają wgląd w optymalizację wydajności.

Aby uzyskać dostęp do REFRESH zapytań w historii zapytań interfejsu użytkownika, wykonaj następujące kroki:

  1. Kliknij ikonę Historia. na lewym pasku bocznym, aby otworzyć interfejs użytkownika historii zapytań .
  2. Zaznacz pole wyboru REFRESH z rozwijanego filtra Oświadczenia.
  3. Kliknij nazwę instrukcji zapytania, aby wyświetlić szczegóły podsumowania, takie jak czas trwania zapytania i zagregowane metryki.
  4. Kliknij pozycję Zobacz profil zapytania , aby otworzyć profil zapytania. Aby uzyskać szczegółowe informacje na temat nawigowania w profilu zapytania, zobacz Profil zapytania.
  5. Możesz użyć linków w sekcji Źródło zapytania, aby otworzyć powiązane zapytanie lub potok danych opcjonalnie.

Możesz również uzyskać dostęp do szczegółów zapytania przy użyciu linków w edytorze SQL lub notesu dołączonego do usługi SQL Warehouse.

Dodatkowe zasoby