Získání streamovaných dat do Lakehouse pomocí strukturovaného streamování Sparku
Strukturované streamování je škálovatelný modul pro zpracování datových proudů odolný proti chybám založený na Sparku. Spark se stará o to, aby operace streamování běžela přírůstkově a nepřetržitě s tím, jak data přicházejí.
Strukturované streamování začalo být dostupné ve Sparku 2.2. Od té doby se jedná o doporučený přístup ke streamování dat. Základním principem strukturovaného streamu je zacházet s živým datovým proudem jako s tabulkou, kde se nová data vždy průběžně připojují, jako nový řádek v tabulce. Existuje několik definovaných integrovaných zdrojů souborů streamování, jako jsou CSV, JSON, ORC, Parquet, a integrovaná podpora služeb zasílání zpráv, jako je Kafka a Event Hubs.
Důležité
Microsoft Fabric je v současné době ve verzi PREVIEW. Tyto informace se týkají předběžné verze produktu, který může být před vydáním podstatně změněn. Společnost Microsoft neposkytuje na zde uvedené informace žádné záruky, vyjádřené ani předpokládané.
Tento článek obsahuje přehled o tom, jak optimalizovat zpracování a příjem událostí prostřednictvím streamování struktury Sparku v produkčních prostředích s vysokou propustností. Mezi navrhované přístupy patří:
- Optimalizace propustnosti streamování dat
- Optimalizace operací zápisu v tabulce Delta a
- Dávkování událostí
Definice úloh Sparku a poznámkové bloky Sparku
Poznámkové bloky Spark jsou vynikajícím nástrojem pro ověřování nápadů a provádění experimentů, abyste získali přehledy z vašich dat nebo kódu. Poznámkové bloky se široce používají při přípravě dat, vizualizaci, strojovém učení a dalších scénářích s velkými objemy dat. Definice úloh Sparku jsou neinteraktivní úlohy orientované na kód spuštěné v clusteru Spark po dlouhou dobu. Definice úloh Sparku poskytují odolnost a dostupnost.
Poznámkové bloky Spark jsou skvělým zdrojem pro otestování logiky kódu a splnění všech obchodních požadavků. Pro zachování provozu v produkčním scénáři jsou ale nejlepším řešením definice úloh Sparku s povolenými zásadami opakování.
Zásady opakování pro definice úloh Sparku
V Microsoft Fabric může uživatel nastavit zásadu opakování pro úlohy Definice úlohy Sparku. I když může být skript v úloze nekonečný, infrastruktura, ve které skript běží, může mít za následek problém vyžadující zastavení úlohy. Nebo může být úloha eliminována kvůli potřebě oprav základní infrastruktury. Zásady opakování umožňují uživateli nastavit pravidla pro automatické restartování úlohy, pokud se zastaví kvůli jakýmkoli základním problémům. Parametry určují, jak často se má úloha restartovat, až do nekonečného počtu opakování a nastavení doby mezi opakováními. Uživatelé tak můžou zajistit, aby jejich úlohy definice úloh Sparku běžely nekonečně dlouho, dokud se uživatel nerozhodne je zastavit.
Zdroje streamování
Nastavení streamování pomocí služby Event Hubs vyžaduje základní konfiguraci, která zahrnuje název oboru názvů služby Event Hubs, název centra, název sdíleného přístupového klíče a skupinu příjemců. Skupina příjemců je zobrazení celého centra událostí. Umožňuje více aplikacím, které využívají, samostatné zobrazení streamu událostí a čtení datového proudu nezávisle vlastním tempem a s jejich posunem.
Oddíly jsou základní součástí schopnosti zpracovávat velké objemy dat. Jeden procesor má omezenou kapacitu pro zpracování událostí za sekundu, zatímco více procesorů může provádět lepší úlohu, pokud se spouští paralelně. Oddíly umožňují paralelní zpracování velkých objemů událostí.
Pokud se používá příliš mnoho oddílů s nízkou rychlostí příjmu dat, čtenáři oddílů pracují s malou částí těchto dat, což způsobuje neoptimální zpracování. Ideální počet oddílů přímo závisí na požadované rychlosti zpracování. Pokud chcete škálovat zpracování událostí, zvažte přidání dalších oddílů. Pro oddíl neexistuje žádný konkrétní limit propustnosti. Agregovaná propustnost ve vašem oboru názvů je ale omezená počtem jednotek propustnosti. Při zvyšování počtu jednotek propustnosti v oboru názvů můžete chtít další oddíly, které umožní souběžným čtenářům dosáhnout maximální propustnosti.
Doporučujeme prozkoumat a otestovat nejlepší počet oddílů pro váš scénář propustnosti. Běžně se ale zobrazují scénáře s vysokou propustností s využitím 32 nebo více oddílů.
Pro připojení aplikace Spark k Azure Event Hubs se doporučuje konektor Azure Event Hubs pro Apache Spark (odkaz).
Lakehouse jako jímka streamování
Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID (atomicity, consistency, isolation, durability) nad řešeními úložiště Data Lake. Delta Lake také podporuje škálovatelné zpracování metadat, vývoj schémat, cestování časem (správu verzí dat), otevřený formát a další funkce.
V Datové Inženýrství Fabric se Delta Lake používá k:
- Snadné upsertování (vložení/aktualizace) a odstranění dat pomocí Spark SQL.
- Komprimujte data, abyste minimalizovali čas strávený dotazováním dat.
- Zobrazení stavu tabulek před a po provedení operací
- Načte historii operací prováděných s tabulkami.
Delta se přidá jako jeden z možných formátů jímek výstupu používaných v writeStream – další informace o existujících výstupních jímkách najdete tady.
Následující příklad ukazuje, jak je možné streamovat data do Delta Lake.
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", " Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
Kód vystřižený v příkladu:
- format() je instrukce, která definuje výstupní formát dat.
- outputMode() definuje způsob zápisu nových řádků ve streamování (tj. append, overwrite).
- toTable() zachová streamovaná data do tabulky Delta vytvořené pomocí hodnoty předané jako parametr.
Optimalizace rozdílových zápisů
Dělení dat je důležitou součástí vytváření robustního řešení streamování: dělení zlepšuje způsob uspořádání dat a také zlepšuje propustnost. Soubory se po operacích Delta snadno fragmentují, což vede k příliš mnoha malým souborům. A příliš velké soubory jsou také problém, protože jejich zápis na disk je dlouhý. Problémem při dělení dat je najít správnou rovnováhu, která má za následek optimální velikosti souborů. Spark podporuje dělení na oddíly v paměti a na disku. Správně dělená data můžou poskytovat nejlepší výkon při uchovávání dat do Delta Lake a dotazování dat z Delta Lake.
- Při dělení dat na disku můžete zvolit způsob dělení dat na základě sloupců pomocí funkce partitionBy(). partitionBy() je funkce, která slouží k rozdělení velkých datových sad do menších souborů na základě jednoho nebo více sloupců zadaných při zápisu na disk. Dělení je způsob, jak zlepšit výkon dotazů při práci s velkou datovou sadou. Vyhněte se výběru sloupce, který generuje příliš malé nebo příliš velké oddíly. Definujte oddíl založený na sadě sloupců s dobrou kardinalitou a rozdělte data na soubory s optimální velikostí.
- Dělení dat v paměti je možné provést pomocí transformací repartition() nebo coalesce(), distribuce dat na více pracovních uzlů a vytvoření několika úkolů, které mohou číst a zpracovávat data paralelně pomocí základů odolné distribuované datové sady (RDD). Umožňuje rozdělit datovou sadu na logické oddíly, které je možné vypočítat na různých uzlech clusteru.
- repartition() slouží ke zvýšení nebo snížení počtu oddílů v paměti. Opětovná rozdělení přesouvá celá data v síti a vyrovnává je napříč všemi oddíly.
- funkce coalesce() slouží pouze k efektivnímu snížení počtu oddílů. To je optimalizovaná verze funkce repartition(), kde je přesun dat napříč všemi oddíly nižší pomocí funkce coalesce().
Kombinace obou přístupů k dělení je vhodné řešení ve scénáři s vysokou propustností. repartition() vytvoří v paměti určitý počet oddílů, zatímco partitionBy() zapisuje soubory na disk pro každý oddíl paměti a sloupec dělení. Následující příklad ukazuje použití obou strategií dělení na oddíly ve stejné úloze Sparku: data se nejprve rozdělí do 48 oddílů v paměti (za předpokladu, že máme celkem 48 jader procesoru) a pak se rozdělí na disk na základě dvou existujících sloupců v datové části.
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", " Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Optimalizovaný zápis
Další možností, jak optimalizovat zápisy do Delta Lake, je použití optimalizovaného zápisu. Optimalizovaný zápis je volitelná funkce, která zlepšuje způsob zápisu dat do tabulky Delta. Spark před zápisem dat sloučí nebo rozdělí oddíly, čímž maximalizuje propustnost dat zapisovaných na disk. Dochází ale k úplnému náhodnému prohazování, takže u některých úloh může způsobit snížení výkonu. Úlohy využívající funkci coalesce() nebo repartition() k dělení dat na disku je možné refaktorovat a začít místo toho používat optimalizovaný zápis.
Následující kód je příkladem použití optimalizovaného zápisu. Všimněte si, že se stále používá partitionBy().
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", " Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Dávkování událostí
Praktickou alternativou je dávkování událostí, aby se minimalizoval počet operací za účelem zkrácení doby strávené ingestováním dat do Delta Lake.
Triggery definují, jak často se má dotaz streamování spouštět (aktivovat) a generovat nová data. Jejich nastavení definuje periodický časový interval pro mikrobatche, kdy se data a dávkové události hromadí do několika trvalých operací, místo aby se neustále zapisovaly na disk.
Následující příklad ukazuje dotaz streamování, kde se události pravidelně zpracovávají v intervalech po jedné minutě.
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", " Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
Výhodou kombinování dávkování událostí v operacích zápisu tabulek Delta je to, že vytváří větší soubory Delta s větším objemem dat, aby se zabránilo malým souborům. Měli byste analyzovat množství ingestovaných dat a najít nejvhodnější dobu zpracování, abyste optimalizovali velikost souborů Parquet vytvořených knihovnou Delta.
Monitorování
Spark 3.1 a novější verze mají integrované uživatelské rozhraní strukturovaného streamování (odkaz), které obsahuje následující metriky streamování:
- Vstupní rychlost
- Rychlost zpracování
- Vstupní řádky
- Doba trvání dávky
- Doba trvání operace
Další kroky
- Získejte streamovaná data do lakehouse a získejte přístup pomocí koncového bodu SQL.