Dela via


Använda strömmande tabeller i Databricks SQL

Databricks rekommenderar att du använder strömmande tabeller för att mata in data med Databricks SQL. En strömningstabell är en tabell som är registrerad i Unity Catalog med extra stöd för direktuppspelning eller inkrementell databehandling. En pipeline skapas automatiskt för varje strömmande tabell. Du kan använda strömmande tabeller för inkrementell datainläsning från Kafka och molnobjektlagring.

Anmärkning

Information om hur du använder Delta Lake-tabeller som strömmande källor och mottagare finns i Delta table streaming reads and writes.

Kravspecifikation

Om du vill använda strömmande tabeller måste du uppfylla följande krav.

Krav för arbetsyta:

Strömmande tabeller som skapats i Databricks SQL backas upp av serverlösa deklarativa Lakeflow-pipelines. Din arbetsyta måste ha stöd för serverlösa pipelines för att kunna använda den här funktionen.

Beräkningskrav:

Du måste använda något av följande:

  • Ett SQL-lager som använder Current kanalen.
  • Beräkning med standardåtkomstläge (tidigare delat åtkomstläge) på Databricks Runtime 13.3 LTS eller senare.
  • Beräkning med dedikerat åtkomstläge (tidigare åtkomstläge för en enskild användare) på Databricks Runtime 15.4 LTS eller senare.

    På Databricks Runtime 15.3 och nedan kan du inte använda dedikerad beräkning för att köra frågor mot strömmande tabeller som ägs av andra användare. Du kan endast använda dedikerad beräkning på Databricks Runtime 15.3 och lägre om du äger strömningstabellen. Skaparen av tabellen är ägaren.

    Databricks Runtime 15.4 LTS och senare stöder förfrågningar mot tabeller genererade av Lakeflow Deklarativa Pipelines med dedikerad beräkning, även om du inte är tabellägare. Du kan debiteras för serverlösa beräkningsresurser när du använder dedikerad beräkning för att köra datafiltreringsåtgärder. Se Detaljerad åtkomstkontroll för dedikerad beräkning.

Behörighetskrav:

  • USE CATALOG och USE SCHEMA behörigheter i katalogen och schemat där du skapar strömningstabellen.
  • Den CREATE TABLE behörigheten för schemat där du skapar strömningstabellen.
  • Behörigheter för åtkomst till tabeller eller platser som tillhandahåller källdata för din strömmande tabell.

Skapa strömmande tabeller

En strömmande tabell definieras av en SQL-fråga i Databricks SQL. När du skapar en strömmande tabell används de data som för närvarande finns i källtabellerna för att skapa strömningstabellen. Därefter uppdaterar du tabellen, vanligtvis enligt ett schema, för att hämta eventuella tillagda data i källtabellerna för att lägga till i strömningstabellen.

När du skapar en direktuppspelningstabell betraktas du som ägare till tabellen.

Om du vill skapa en strömmande tabell från en befintlig tabell använder du -instruktionenCREATE STREAMING TABLE, som i följande exempel:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT product, price FROM STREAM raw_data;

I det här fallet skapas strömningstabellen sales från specifika kolumner i raw_data tabellen, med ett schema för uppdatering varje timme. Frågan som används måste vara en direktuppspelningsfråga . Använd nyckelordet STREAM för att använda strömmande semantik för att läsa från källan.

När du skapar en strömmande tabell med instruktionen CREATE OR REFRESH STREAMING TABLE börjar den första datauppdateringen och populationen omedelbart. Dessa åtgärder använder inte DBSQL-lagerberäkning. I stället förlitar sig strömningstabellen på serverlösa deklarativa Lakeflow-pipelines för både skapande och uppdatering. En dedikerad serverlös pipeline skapas och hanteras automatiskt av systemet för varje strömmande tabell.

Läsa in filer med Auto Loader

Om du vill skapa en strömmande tabell från filer i en volym använder du Auto Loader. Använd Auto Loader med Lakeflow Deklarativa pipelines för de flesta datainmatningsuppgifter från molnobjektlagring. Auto Loader och Lakeflow Deklarativa Pipelines är utformade för att stegvis och idempotent läsa in ständigt växande data när det tas emot i molnlagringen.

Om du vill använda Auto Loader i Databricks SQL använder du read_files funktionen. Följande exempel visar hur du använder Auto Loader för att läsa en volym JSON-filer till en strömmande tabell:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT * FROM STREAM read_files(
    "/Volumes/my_catalog/my_schema/my_volume/path/to/data",
    format => "json"
  );

Om du vill läsa data från molnlagring kan du också använda Automatisk inläsning:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Mer information om automatisk inläsning finns i Vad är automatisk inläsning?. Mer information om hur du använder automatisk inläsning i SQL finns i Läsa in data från objektlagring.

Strömmande inmatning från andra källor

Exempel på inmatning från andra källor, inklusive Kafka, finns i Ladda data med Lakeflow deklarativa pipelines.

Mata bara in ny data

Som standard läser funktionen read_files alla befintliga data i källkatalogen när tabellen skapas och bearbetar sedan nyligen ankommande poster med varje uppdatering.

Om du vill undvika att mata in data som redan finns i källkatalogen när tabellen skapas anger du alternativet includeExistingFiles till false. Det innebär att endast data som tas emot i katalogen när tabellen har skapats har bearbetats. Till exempel:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    '/path/to/files',
    includeExistingFiles => false
  );

Ange körningskanalen

Strömmande tabeller som skapats med SQL-lager uppdateras automatiskt med hjälp av en pipeline. Deklarativa pipelines för Lakeflow körs i current kanalen som standard. Se release notes för Lakeflow Deklarativa Pipelines och processen för versionsuppgradering för att lära dig om lanseringsprocessen.

Databricks rekommenderar att du använder current kanalen för produktionsarbetsbelastningar. Nya funktioner släpps först till preview kanalen. Du kan sätta en pipeline till förhandsgranskningskanalen Lakeflow Deklarativa Pipelines för att testa nya funktioner genom att specificera preview som en tabellegenskap. Du kan ange den här egenskapen när du skapar tabellen eller när tabellen har skapats med hjälp av en ALTER-instruktion.

I följande kodexempel visas hur du ställer in kanalen som förhandsversion i en CREATE-instruktion:

CREATE OR REFRESH STREAMING TABLE sales
  TBLPROPERTIES ('pipelines.channel' = 'preview')
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM raw_data;

Dölj känsliga data

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Du kan använda strömmande tabeller för att dölja känsliga data från användare som kommer åt tabellen. En metod är att definiera frågan så att den utesluter känsliga kolumner eller rader helt och hållet. Du kan också använda kolumnmasker eller radfilter baserat på behörigheterna för den frågande användaren. Du kan till exempel dölja tax_id kolumnen för användare som inte finns i gruppen HumanResourcesDept. Det gör du genom att använda syntaxen ROW FILTER och MASK när du skapar strömningstabellen. Mer information finns i Radfilter och kolumnmasker.

Uppdatera en strömningstabell

Uppdateringar kan schemaläggas automatiskt när du skapar strömningstabellen. Du kan också uppdatera strömmande tabeller manuellt. Även om du har en schemalagd uppdatering kan du anropa en manuell uppdatering när som helst. Uppdateringar hanteras av samma pipeline som skapades automatiskt tillsammans med strömningstabellen.

Så här uppdaterar du en strömmande tabell:

REFRESH STREAMING TABLE sales;

Du kan kontrollera status för den senaste uppdateringen med DESCRIBE TABLE EXTENDED.

Anmärkning

Endast tabellägaren kan uppdatera en strömmande tabell för att hämta de senaste data. Användaren som skapar tabellen är ägare och ägaren kan inte ändras. Du kan behöva uppdatera strömningstabellen innan du använder frågor om tidsresor.

Så här fungerar uppdateringen

En uppdatering av strömningstabellen utvärderar bara nya rader som har anlänt sedan den senaste uppdateringen och lägger endast till nya data.

Varje uppdatering använder den aktuella definitionen av strömningstabellen för att bearbeta dessa nya data. Om du ändrar en definition för en strömmande tabell beräknas inte befintliga data automatiskt om. Om en ändring inte är kompatibel med befintliga data (t.ex. om du ändrar en datatyp) misslyckas nästa uppdatering med ett fel.

I följande exempel förklaras hur ändringar i en strömmande tabelldefinition påverkar uppdateringsbeteendet:

  • Om du tar bort ett filter bearbetas inte tidigare filtrerade rader.
  • Om du ändrar kolumnprojektioner påverkas inte hur befintliga data bearbetades.
  • Kopplingar med statiska ögonblicksbilder använder ögonblicksbildens tillstånd vid det första bearbetningstillfället. För sent ankommande data som skulle ha matchats med den uppdaterade ögonblicksbilden ignoreras. Detta kan leda till att fakta tas bort om dimensionerna är sena.
  • Om du ändrar CAST för en befintlig kolumn resulterar det i ett fel.

Om dina data ändras på ett sätt som inte kan stödjas i den befintliga strömningstabellen kan du utföra en fullständig uppdatering.

Uppdatera en strömningstabell fullständigt

Fullständiga uppdateringar bearbetar om alla data som är tillgängliga i källan med den senaste definitionen. Vi rekommenderar inte att du anropar fullständiga uppdateringar på källor som inte behåller hela datahistoriken eller har korta kvarhållningsperioder, till exempel Kafka, eftersom den fullständiga uppdateringen trunkerar befintliga data. Du kanske inte kan återställa gamla data om data inte längre är tillgängliga i källan.

Till exempel:

REFRESH STREAMING TABLE sales FULL;

Ändra schemat för en strömningstabell

Du kan ändra (eller ange) ett automatiskt uppdateringsschema för din strömmande tabell. Följande exempel visar hur du anger ett schema med hjälp av ALTER STREAMING TABLE:

ALTER STREAMING TABLE sales
  ADD SCHEDULE every 1 hour;

Exempel på uppdateringsschemafrågor finns i ALTER STREAMING TABLE.

Spåra status för en uppdatering

Du kan visa status för en uppdatering av en strömningstabell genom att visa pipelinen som hanterar strömningstabellen i Lakeflow Declarative Pipelines UI, eller genom att visa uppdateringsinformationenDESCRIBE EXTENDED som returneras av kommandot för strömningstabellen.

DESCRIBE TABLE EXTENDED <table-name>;

Alternativt kan du visa strömningstabellen i Katalogutforskaren och se uppdateringsstatusen där:

  1. Klicka på dataikonen.Katalog i sidofältet.
  2. I trädet Katalogutforskaren till vänster öppnar du katalogen och väljer det schema där din strömmande tabell finns.
  3. Öppna objektet Tabeller under det schema som du valde och klicka på den strömmande tabellen.

Härifrån kan du använda flikarna under strömningstabellens namn för att visa och redigera information om strömningstabellen, inklusive:

  • Uppdatera status och historik
  • Tabelldesign
  • Exempeldata (kräver en aktiv beräkning)
  • Behörigheter
  • Ursprung, inklusive tabeller och pipelines som den här strömmande tabellen är beroende av
  • Insikter om användning
  • Monitorer som du har skapat för den här strömningstabellen

Kontrollera åtkomsten till strömmande tabeller

Strömningstabeller har stöd för omfattande åtkomstkontroller för att stödja datadelning och samtidigt undvika att exponera potentiellt privata data. En ägare av direktuppspelningstabellen eller en användare med behörigheten MANAGE kan bevilja SELECT behörigheter till andra användare. Användare med SELECT åtkomst till strömningstabellen behöver SELECT inte åtkomst till tabellerna som refereras av strömningstabellen. Den här åtkomstkontrollen möjliggör datadelning samtidigt som åtkomsten till underliggande data kontrolleras.

Tilldela behörigheter till en strömningstabell

Om du vill bevilja åtkomst till en strömningstabell använder du -instruktionenGRANT:

GRANT <privilege_type> ON <st_name> TO <principal>;

privilege_type Kan vara:

  • SELECT – användaren kan SELECT strömma tabellen.
  • REFRESH – användaren kan REFRESH strömma tabellen. Uppdateringar körs med ägarens behörigheter.

I följande exempel skapas en strömningstabell och användare får behörighet att välja och uppdatera:

CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;

-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;

-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;

Mer information om hur du beviljar behörigheter för skyddsbara objekt i Unity Catalog finns i Unity Catalog-privilegier och skyddsbara objekt.

Återkalla behörigheter från en streaming-tabell

Om du vill återkalla åtkomst från en strömningstabell använder du -instruktionenREVOKE:

REVOKE privilege_type ON <st_name> FROM principal;

När SELECT behörigheter i en källtabell återkallas från strömningstabellägaren eller någon annan användare som har beviljats MANAGE eller SELECT behörigheter i strömningstabellen, eller om källtabellen tas bort, kan ägaren eller användaren som beviljats åtkomst till strömningstabellen fortfarande köra frågor mot strömningstabellen. Följande beteende inträffar dock:

  • Den strömmande tabellägaren eller andra som har förlorat åtkomsten till en strömmande tabell kan inte längre REFRESH den strömmande tabellen, och strömmande tabellen blir inaktuell.
  • Om det automatiseras med ett schema, misslyckas nästa planerade REFRESH eller körs inte.

I följande exempel återkallas behörigheten SELECT från read_only_user:

REVOKE SELECT ON st_name FROM read_only_user;

ta bort poster från en streamingtabell permanent

Viktigt!

Stöd för REORG-instruktionen med strömmande tabeller finns i offentlig förhandsversion.

Anmärkning

  • Om du använder en REORG-instruktion med en strömmande tabell krävs Databricks Runtime 15.4 och senare.
  • Även om du kan använda -instruktionen REORG med en strömningstabell krävs den bara när du tar bort poster från en strömmande tabell med borttagningsvektorer aktiverade. Kommandot har ingen effekt när det används med en strömmande tabell utan att borttagningsvektorer har aktiverats.

För att fysiskt ta bort poster från den underliggande lagringen för en strömmande tabell med borttagningsvektorer aktiverade, till exempel för GDPR-efterlevnad, måste ytterligare åtgärder vidtas för att säkerställa att en VACUUM åtgärd körs på strömningstabellens data.

Så här tar du bort poster fysiskt från underliggande lagring:

  1. Uppdatera poster eller ta bort poster från strömningstabellen.
  2. Kör en REORG-instruktion mot strömningstabellen och ange parametern APPLY (PURGE). Till exempel REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. Vänta tills datakvarhållningsperioden för strömningstabellen har passerat. Standardperioden för datakvarhållning är sju dagar, men den kan konfigureras med egenskapen delta.deletedFileRetentionDuration tabell. Se Konfigurera datalagring för frågor rörande tidsresor.
  4. REFRESH strömningstabellen. Se även Uppdatera en streamingtabel. Inom 24 timmar efter REFRESH åtgärden körs underhållsuppgifter för Lakeflow Declarative Pipelines, inklusive den VACUUM åtgärd som krävs för att säkerställa att poster tas bort permanent.

Övervaka körningar med hjälp av frågehistorik

Du kan använda frågehistorik-sidan för att få åtkomst till detaljer om frågor och frågeprofiler som kan hjälpa dig att identifiera frågor som fungerar dåligt och flaskhalsar i Lakeflow-deklarativa pipeline-flöden som används för att köra streamingtabelluppdateringar. En översikt över vilken typ av information som är tillgänglig i frågehistorik och frågeprofiler finns i Frågehistorik och Frågeprofil.

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion. Arbetsyteadministratörer kan aktivera den här funktionen från sidan Förhandsversioner . Se Hantera förhandsversioner av Azure Databricks.

Alla instruktioner som rör strömmande tabeller visas i frågehistoriken. Du kan använda listrutan Statement för att välja valfritt kommando och granska relaterade frågor. Alla CREATE instruktioner följs av en REFRESH instruktion som körs asynkront på en pipeline. Instruktionerna REFRESH innehåller vanligtvis detaljerade frågeplaner som ger insikter om hur du optimerar prestanda.

Använd följande steg för att komma åt REFRESH instruktioner i användargränssnittet för frågehistorik:

  1. Klicka på Ikonen Historik. Öppna användargränssnittet för frågehistorik i det vänstra sidofältet.
  2. Markera kryssrutan REFRESH från filterlistrutan Statement.
  3. Klicka på namnet på frågeuttrycket för att visa sammanfattningsinformation som frågans varaktighet och aggregerade mått.
  4. Klicka på Se frågeprofil för att öppna frågeprofilen. Mer information om hur du navigerar i frågeprofilen finns i Frågeprofil .
  5. Du kan också använda länkarna i avsnittet Frågekälla för att öppna den relaterade frågan eller pipelinen.

Du kan också komma åt frågeinformation med hjälp av länkar i SQL-redigeraren eller från en notebook-fil som är kopplad till ett SQL-lager.

Ytterligare resurser