Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL
Usługa Databricks zaleca używanie tabel przesyłania strumieniowego do pozyskiwania danych przy użyciu usługi Databricks SQL. Tabela przesyłania strumieniowego to tabela zarejestrowana w wykazie aparatu Unity z dodatkową obsługą przesyłania strumieniowego lub przyrostowego przetwarzania danych. Potok delta Live Tables jest tworzony automatycznie dla każdej tabeli przesyłania strumieniowego. Tabele przesyłania strumieniowego można używać do przyrostowego ładowania danych z magazynu obiektów platformy Kafka i chmury.
W tym artykule pokazano, jak za pomocą tabel przesyłania strumieniowego ładować dane z magazynu obiektów w chmurze skonfigurowanego jako wolumin wykazu aparatu Unity (zalecane) lub lokalizację zewnętrzną.
Uwaga
Aby dowiedzieć się, jak używać tabel usługi Delta Lake jako źródeł przesyłania strumieniowego i ujścia, zobacz Delta table streaming reads and writes (Odczyty i zapisy w tabeli delta).
Przed rozpoczęciem
Przed rozpoczęciem należy spełnić następujące wymagania.
Wymagania dotyczące obszaru roboczego:
- Konto usługi Azure Databricks z włączoną obsługą bezserwerową. Aby uzyskać więcej informacji, zobacz Włączanie bezserwerowych magazynów SQL.
- Obszar roboczy z włączonym wykazem aparatu Unity. Aby uzyskać więcej informacji, zobacz Konfigurowanie wykazu aparatu Unity i zarządzanie nim.
Wymagania dotyczące obliczeń:
Należy użyć jednej z następujących opcji:
Usługa SQL Warehouse korzystająca z kanału
Current
.Środowisko obliczeniowe z trybem dostępu współdzielonego w środowisku Databricks Runtime 13.3 LTS lub nowszym.
Środowisko obliczeniowe z trybem dostępu pojedynczego użytkownika w środowisku Databricks Runtime 15.4 LTS lub nowszym.
W środowisku Databricks Runtime 15.3 i poniżej nie można używać obliczeń pojedynczego użytkownika do wykonywania zapytań dotyczących tabel przesyłania strumieniowego należących do innych użytkowników. Możesz użyć obliczeń pojedynczego użytkownika w środowisku Databricks Runtime 15.3 i poniżej tylko wtedy, gdy jesteś właścicielem tabeli przesyłania strumieniowego. Twórcą tabeli jest właściciel.
Środowisko Databricks Runtime 15.4 LTS i nowsze obsługują zapytania dotyczące tabel generowanych przez tabele delta live w obliczeniach pojedynczego użytkownika, niezależnie od własności tabeli. Aby skorzystać z filtrowania danych dostępnego w środowisku Databricks Runtime 15.4 LTS lub nowszym, należy potwierdzić, że obszar roboczy jest włączony dla obliczeń bezserwerowych , ponieważ funkcja filtrowania danych, która obsługuje tabele generowane przez tabele delta Live, działa na bezserwerowych obliczeniach. Opłaty za zasoby obliczeniowe bezserwerowe mogą być naliczane w przypadku korzystania z obliczeń pojedynczego użytkownika do uruchamiania operacji filtrowania danych. Zobacz Szczegółowa kontrola dostępu w obliczeniach pojedynczego użytkownika.
Wymagania dotyczące uprawnień:
- Uprawnienie
READ FILES
w lokalizacji zewnętrznej wykazu aparatu Unity. Aby uzyskać informacje, zobacz Tworzenie lokalizacji zewnętrznej w celu połączenia magazynu w chmurze z usługą Azure Databricks. - Uprawnienie
USE CATALOG
do katalogu, w którym tworzysz tabelę przesyłania strumieniowego. - Uprawnienie
USE SCHEMA
do schematu, w którym tworzysz tabelę przesyłania strumieniowego. - Uprawnienie
CREATE TABLE
do schematu, w którym tworzysz tabelę przesyłania strumieniowego.
Inne wymagania:
Ścieżka do danych źródłowych.
Przykład ścieżki woluminu:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Przykład ścieżki lokalizacji zewnętrznej:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Uwaga
W tym artykule założono, że dane, które chcesz załadować, są w lokalizacji magazynu w chmurze, która odpowiada woluminowi wykazu aparatu Unity lub lokalizacji zewnętrznej, do której masz dostęp.
Odnajdywanie i wyświetlanie podglądu danych źródłowych
Na pasku bocznym obszaru roboczego kliknij pozycję Zapytania, a następnie kliknij pozycję Utwórz zapytanie.
W edytorze zapytań wybierz magazyn SQL, który używa
Current
kanału z listy rozwijanej.Wklej następujące elementy do edytora, podstawiając wartości w nawiasach kątowych (
<>
), aby uzyskać informacje identyfikujące dane źródłowe, a następnie kliknij przycisk Uruchom.Uwaga
Podczas uruchamiania funkcji z wartością
read_files
tabeli mogą wystąpić błędy wnioskowania schematu, jeśli wartości domyślne funkcji nie mogą analizować danych. Na przykład może być konieczne skonfigurowanie trybu wielowierszowego dla wielowierszowych plików CSV lub JSON. Aby uzyskać listę opcji analizatora, zobacz read_files funkcji wartości tabeli./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
Ładowanie danych do tabeli przesyłania strumieniowego
Aby utworzyć tabelę przesyłania strumieniowego na podstawie danych w magazynie obiektów w chmurze, wklej następujący kod do edytora zapytań, a następnie kliknij przycisk Uruchom:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Odświeżanie tabeli przesyłania strumieniowego przy użyciu potoku DLT
W tej sekcji opisano wzorce odświeżania tabeli przesyłania strumieniowego z najnowszymi danymi dostępnymi ze źródeł zdefiniowanych w zapytaniu.
W przypadku tabeli CREATE
REFRESH
przesyłania strumieniowego aktualizacja jest przetwarzana przy użyciu bezserwerowego potoku tabel na żywo delty. Każda zdefiniowana tabela przesyłania strumieniowego ma skojarzony potok delta live tables.
Po uruchomieniu REFRESH
polecenia zostanie zwrócony link potoku DLT. Aby sprawdzić stan odświeżania, możesz użyć linku potoku DLT.
Uwaga
Tylko właściciel tabeli może odświeżyć tabelę przesyłania strumieniowego, aby pobrać najnowsze dane. Użytkownik tworzący tabelę jest właścicielem i nie można zmienić właściciela.
Zobacz Co to jest delta live tables?.
Pozyskiwanie nowych danych tylko
Domyślnie read_files
funkcja odczytuje wszystkie istniejące dane w katalogu źródłowym podczas tworzenia tabeli, a następnie przetwarza nowo przybywające rekordy przy każdym odświeżeniu.
Aby uniknąć pozyskiwania danych, które już istnieją w katalogu źródłowym w momencie tworzenia tabeli, ustaw includeExistingFiles
opcję na false
. Oznacza to, że tylko dane, które docierają do katalogu po przetworzeniu tworzenia tabeli. Na przykład:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
W pełni odśwież tabelę przesyłania strumieniowego
Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne odświeżanie obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.
Na przykład:
REFRESH STREAMING TABLE my_bronze_table FULL
Planowanie tabeli przesyłania strumieniowego na potrzeby automatycznego odświeżania
Aby skonfigurować tabelę przesyłania strumieniowego w celu automatycznego odświeżania na podstawie zdefiniowanego harmonogramu, wklej następujące polecenie w edytorze zapytań, a następnie kliknij przycisk Uruchom:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Na przykład zapytania dotyczące harmonogramu odświeżania można znaleźć w temacie ALTER STREAMING TABLE (ALTER STREAMING TABLE).
Śledzenie stanu odświeżania
Stan odświeżania tabeli przesyłania strumieniowego można wyświetlić, wyświetlając potok, który zarządza tabelą przesyłania strumieniowego w interfejsie użytkownika tabel delta Live Tables lub wyświetlając informacje odświeżania zwracane przez DESCRIBE EXTENDED
polecenie dla tabeli przesyłania strumieniowego.
DESCRIBE EXTENDED <table-name>
Pozyskiwanie strumieniowe z platformy Kafka
Aby zapoznać się z przykładem pozyskiwania przesyłania strumieniowego z platformy Kafka, zobacz read_kafka.
Udzielanie użytkownikom dostępu do tabeli przesyłania strumieniowego
Aby przyznać użytkownikom SELECT
uprawnienia do tabeli przesyłania strumieniowego, aby mogli wykonywać względem niej zapytania, wklej następujące polecenie w edytorze zapytań, a następnie kliknij przycisk Uruchom:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Aby uzyskać więcej informacji na temat udzielania uprawnień do zabezpieczanych obiektów wykazu aparatu Unity, zobacz Uprawnienia wykazu aparatu Unity i zabezpieczane obiekty.