Popis strukturovaného streamování Sparku

Dokončeno

Strukturované streamování Sparku je oblíbená platforma pro zpracování v paměti. Má jednotné paradigma pro dávkové a streamování. Cokoli, co se naučíte a používáte pro dávku, můžete použít ke streamování, takže je snadné růst od dávkování dat až po streamování dat. Spark Streaming je jednoduše modul, který běží nad Apache Sparkem.

What is Spark structured streaming

Strukturované streamování vytvoří dlouhotrvající dotaz, během kterého použijete operace na vstupní data, jako je výběr, projekce, agregace, okna a spojení streamovaného datového rámce s odkazem na datové rámce. Dále vypíšete výsledky do úložiště souborů (objekty blob služby Azure Storage nebo Data Lake Storage) nebo do jakéhokoli úložiště dat pomocí vlastního kódu (například SQL Database nebo Power BI). Strukturované streamování také poskytuje výstup do konzoly pro místní ladění a do tabulky v paměti, abyste viděli data vygenerovaná pro ladění ve službě HDInsight.

Toky jako tabulky

Strukturované streamování Sparku představuje datový proud jako tabulku, která není do hloubky nevázaná, to znamená, že se tabulka stále roste, jak dorazí nová data. Tato vstupní tabulka se průběžně zpracovává dlouhotrvajícím dotazem a výsledky odeslané do výstupní tabulky:

How Spark structured streaming represents data as tables

Ve strukturovaném streamování data přicházejí do systému a okamžitě se ingestují do vstupní tabulky. Zapisujete dotazy (pomocí rozhraní API datového rámce a datové sady), které provádějí operace s touto vstupní tabulkou. Výstup dotazu vrátí jinou tabulku, tabulku výsledků. Tabulka výsledků obsahuje výsledky dotazu, ze kterých nakreslíte data pro externí úložiště dat, jako je relační databáze. Načasování zpracování dat ze vstupní tabulky je řízeno intervalem aktivační události. Ve výchozím nastavení je interval triggeru nulový, takže strukturované streamování se pokusí zpracovat data, jakmile dorazí. V praxi to znamená, že jakmile se dokončí zpracování spuštění předchozího dotazu strukturovaného streamování, spustí se další spuštění zpracování proti nově přijatým datům. Trigger můžete nakonfigurovat tak, aby běžel v intervalu, aby se streamovaná data zpracovávala v dávkách založených na čase.

Data v tabulkách výsledků můžou obsahovat pouze data, která jsou nová od posledního zpracování dotazu (režim připojení) nebo se tabulka může aktualizovat pokaždé, když jsou nová data, aby tabulka obsahovala všechna výstupní data od začátku streamovacího dotazu (úplný režim).

Režim připojení

V režimu připojení se do tabulky výsledků zapisují pouze řádky přidané do tabulky výsledků od posledního spuštění dotazu a zapisují se do externího úložiště. Nejjednodušší dotaz například jenom zkopíruje všechna data ze vstupní tabulky do tabulky výsledků, která není zadaná. Při každém uplynutí intervalu aktivační události se nová data zpracovávají a řádky představující nová data se zobrazí v tabulce výsledků.

Představte si scénář, ve kterém zpracováváte data cen akcií. Předpokládejme, že první trigger zpracovával jednu událost v okamžiku 00:01 pro akcie MSFT s hodnotou 95 dolarů. V prvním triggeru dotazu se v tabulce výsledků zobrazí pouze řádek s časem 00:01. V okamžiku 00:02, kdy přijde jiná událost, je jediným novým řádkem řádek s časem 00:02, takže tabulka výsledků by obsahovala pouze tento jeden řádek.

How Spark structured streaming in append mode

Při použití režimu připojení by dotaz použil projekce (výběr sloupců, o které se stará), filtrování (výběr pouze řádků, které splňují určité podmínky) nebo spojení (rozšíření dat o data ze statické vyhledávací tabulky). Režim připojení usnadňuje nabízení jenom relevantních nových datových bodů do externího úložiště.

Režim dokončení

Představte si stejný scénář, tentokrát pomocí režimu dokončení. V úplném režimu se celá výstupní tabulka aktualizuje na každém triggeru, takže tabulka obsahuje data nejen z posledního spuštění triggeru, ale ze všech spuštění. Úplný režim můžete použít ke zkopírování dat, která se nedají ze vstupní tabulky z tabulky výsledků zkopírovat. Při každém aktivovaném spuštění se nové řádky výsledků zobrazí společně se všemi předchozími řádky. Tabulka výsledků výstupu nakonec uloží všechna shromážděná data od začátku dotazu a nakonec dojde k nedostatku paměti. Úplný režim je určený pro agregační dotazy, které nějakým způsobem shrnují příchozí data, takže při každé aktivaci se tabulka výsledků aktualizuje novým souhrnem.

Předpokládejme, že zatím jsou data za pět sekund již zpracována a je čas zpracovat data za šestou sekundu. Vstupní tabulka obsahuje události pro čas 00:01 a čas 00:03. Cílem tohoto ukázkového dotazu je dát průměrnou cenu akcií každých pět sekund. Implementace tohoto dotazu použije agregaci, která vezme všechny hodnoty, které spadají do každého 5sekundového okna, zprůměruje cenu akcií a vytvoří řádek pro průměrnou cenu akcií v daném intervalu. Na konci prvního 5sekundového okna jsou dvě řazené kolekce členů: (00:01, 1, 95) a (00:03, 1, 98). Takže pro okno 00:00-00:05 agregace vytvoří řazenou kolekci členů s průměrnou cenou akcií 96,50 USD. V dalším 5sekundovém okně je v čase 00:06 pouze jeden datový bod, takže výsledná cena akcií je 98 USD. V době 00:10 má tabulka výsledků řádky pro obě okna 00:00-00:05 a 00:05-00:00:10, protože dotaz vypíše všechny agregované řádky, nejen nové řádky. Proto se tabulka výsledků i nadále zvětšuje s tím, jak se přidávají nová okna.

How Spark structured streaming in complete mode

Ne všechny dotazy používající úplný režim způsobují, že se tabulka zvětší bez hranic. V předchozím příkladu zvažte, že místo průměrování ceny akcií na okno se průměruje podle akcií. Výsledná tabulka obsahuje pevný počet řádků (jeden na sklad) s průměrnou cenou akcií pro akcie ve všech datových bodech přijatých z tohoto zařízení. Při přijetí nových cen akcií se tabulka výsledků aktualizuje tak, aby průměry v tabulce byly vždy aktuální.

Jaké jsou výhody strukturovaného streamování Sparku?

Být ve finančním sektoru, načasování transakcí je velmi důležité. Například na burze je rozdíl mezi tím, kdy se obchod s akciemi děje na burze nebo když obdržíte transakci, nebo když jsou data přečtená všechna důležitá. U finančních institucí závisí na těchto důležitých datech a na načasování, které s nimi souvisí.

Čas události, pozdní data a vodoznaky

Strukturované streamování Sparku zná rozdíl mezi časem události a časem zpracování události systémem. Každá událost je řádek v tabulce a čas události je hodnota sloupce v řádku. To umožňuje agregace založené na oknech (například počet událostí každou minutu) být pouze seskupením a agregací ve sloupci času události – každé časové okno je skupina a každý řádek může patřit do více oken/skupin. Proto je možné agregační dotazy založené na časových obdobích událostí definovat konzistentně u statické datové sady i datového proudu, což usnadňuje život datového inženýra.

Tento model navíc přirozeně zpracovává data, která přišla později, než se očekávalo, na základě času události. Spark má plnou kontrolu nad aktualizací starých agregací v případě pozdních dat a také vyčištěním starých agregací, aby se omezila velikost dat zprostředkujícího stavu. Vzhledem k tomu, že Spark 2.1 podporuje vodoznaky, což umožňuje zadat prahovou hodnotu pozdních dat a umožňuje modulu odpovídajícím způsobem vyčistit starý stav.

Flexibilita při nahrávání posledních dat nebo všech dat

Jak je popsáno v předchozí lekci, můžete při práci se strukturovaným streamováním Sparku použít režim připojení nebo režim Dokončení, aby tabulka výsledků obsahovala pouze nejnovější data nebo všechna data.

Podporuje přesun z mikrodávek na průběžné zpracování.

Změnou typu triggeru dotazu Sparku můžete přejít ze zpracování mikrodávek na průběžné zpracování bez dalších změn v rozhraní. Tady jsou různé druhy triggerů, které Spark podporuje.

  • Není zadáno, toto je výchozí hodnota. Pokud není explicitně nastavená žádná aktivační událost, dotaz se spustí v mikro dávkách a bude zpracován nepřetržitě.
  • Pevná intervalová mikrodávka. Dotaz se spustí v opakovaných intervalech nastavených uživatelem. Pokud se nepřijdou žádná nová data, nespustí se žádný mikrodávkový proces.
  • Jednorázová mikrodávka. Dotaz spustí jednu mikrodávku a pak se zastaví. To je užitečné, pokud chcete zpracovat všechna data od předchozí mikrodávkové dávky a můžete ušetřit náklady na úlohy, které nemusí běžet nepřetržitě.
  • Nepřetržitě s pevným intervalem kontrolního bodu. Dotaz se spouští v novém režimu průběžného zpracování s nízkou latencí, který umožňuje nízkou (~1 ms) latenci s alespoň jednou zárukou odolnosti proti chybám. To se podobá výchozímu nastavení, které může dosáhnout přesně jednou záruk, ale pouze dosahuje latencí přibližně 100 ms v nejlepším případě.

Kombinování dávkových a streamovacích úloh

Kromě zjednodušení přechodu z dávky na úlohy streamování můžete také kombinovat dávkové a streamované úlohy. To je užitečné zejména v případě, že chcete použít dlouhodobá historická data k předpovídání budoucích trendů při zpracování informací v reálném čase. U akcií se můžete chtít podívat na cenu akcií za posledních 5 let a předpovědět změny provedené kolem oznámení o ročních nebo čtvrtletních výnosech.

Časové intervaly událostí

Data můžete zachytávat v oknech, jako je vysoká cena akcií a nízká cena akcií během jednoho dne nebo jednominutové okno – ať už se rozhodnete, jakýkoli interval a strukturované streamování Sparku to podporuje. Podporují se také překrývající se okna.

Vytváření kontrolních bodů pro zotavení po selhání

V případě selhání nebo úmyslného vypnutí můžete obnovit předchozí průběh a stav předchozího dotazu a pokračovat tam, kde skončil. To se provádí pomocí kontrolních bodů a protokolů s předstihem pro zápis. Dotaz můžete nakonfigurovat s umístěním kontrolního bodu a dotaz uloží všechny informace o průběhu (tj. rozsah posunů zpracovaných v jednotlivých triggerech) a spuštěné agregace do umístění kontrolního bodu. Toto umístění kontrolního bodu musí být cesta v systému souborů kompatibilním s HDFS a při spuštění dotazu lze nastavit jako možnost v objektu DataStreamWriter.