Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Strukturované streamování je škálovatelný modul pro zpracování datových proudů odolný proti chybám založený na Sparku. Zachází s živým datovým proudem jako s tabulkou, ke které se nové řádky průběžně připojují. Strukturované streamování podporuje integrované zdroje souborů, jako jsou CSV, JSON, ORC a Parquet, spolu se službami zasílání zpráv, jako je Kafka a Azure Event Hubs.
Tento článek popisuje nastavení zdroje streamování, jako je Azure Event Hubs, příjem streamovaných dat do tabulky Lakehouse Delta, optimalizace výkonu zápisu pomocí dělení a dávkování událostí a spolehlivé spouštění úloh streamování v produkčním prostředí.
Nastavení zdroje streamování
Pokud chcete streamovat data do jezera, nejprve nakonfigurujte připojení ke zdroji streamování. Azure Event Hubs je běžnou volbou. Pomocí konektoru Azure Event Hubs pro Apache Spark připojte aplikaci Spark ke službě Azure Event Hubs.
Základní konfigurace služby Event Hubs vyžaduje název oboru názvů služby Event Hubs, název centra, název sdíleného přístupového klíče a skupinu příjemců.
Spotřebitelská skupina je pohledem na celé centrum událostí. Skupiny příjemců umožňují, aby každá z nich mohla využívat více aplikací, měla samostatné zobrazení streamu a mohli stream číst nezávisle na svém vlastním tempu a s vlastními posuny.
Oddíly ve službě Event Hubs umožňují paralelně zpracovávat velké objemy událostí. Jeden procesor má omezenou kapacitu pro zpracování událostí za sekundu, zatímco několik procesorů může paralelně fungovat napříč oddíly.
Pokud se používá příliš mnoho oddílů s nízkou mírou příjmu dat, čtenáři oddílů pracují s malou částí dat, což způsobuje neoptimální zpracování. Ideální počet oddílů závisí na požadované rychlosti zpracování. Když zvýšíte počet jednotek propustnosti ve vašem oboru názvů, můžete chtít další oddíly, které umožní souběžným čtenářům dosáhnout maximální propustnosti.
Otestujte nejlepší počet partí pro váš scénář propustnosti. Scénáře s vysokou propustností běžně využívají 32 nebo více oddílů.
Tabulka Delta jako cíl pro streamování
Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID (atomicity, konzistence, izolace a stálosti) nad službou Data Lake Storage. V rámci Fabric Data Engineering podporuje Delta Lake upserty, komprese dat, časové procházení, vývoj schématu a úložiště s otevřeným formátem.
Ve delta formátu výstupu streamovaná writeStreamdata proudí přímo do tabulky Delta. Následující příklad čte ze služby Event Hubs, analyzuje text zprávy a zapisuje do tabulky Delta:
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = (
spark.readStream
.format("eventhubs")
.options(**ehConf)
.load()
)
Schema = StructType([
StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True),
])
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.toTable("deltaeventstable")
)
V kódu format("delta") nastaví Delta jako výstupní formát, outputMode("append") zapíše do tabulky pouze nové řádky a toTable("deltaeventstable") zachová streamovaná data do spravované tabulky Delta.
Optimalizace výkonu streamování
Jakmile základní příjem dat streamování funguje, můžete zlepšit propustnost a organizaci souborů pomocí technik optimalizace v následujících částech.
Rozčlenění dat pro zápisy
Pokud chcete optimalizovat propustnost, rozdělte data efektivně. Dělení zlepšuje propustnost zápisu i výkon podřízených dotazů. Data můžete rozdělit do paměti, na disk nebo obojí.
Na disku – slouží partitionBy() k uspořádání dat do podadresářů na základě hodnot sloupců. Zvolte sloupce s dobrou kardinalitou, které vytvářejí soubory s optimální velikostí. Vyhněte se sloupcům, které vytvářejí příliš mnoho malých oddílů nebo příliš málo velkých oddílů.
V paměti – Před zápisem použijte repartition() nebo coalesce() distribuujte data mezi pracovní uzly:
-
repartition()zvyšuje nebo snižuje oddíly s úplným náhodným prohazováním, vyrovnáváním dat rovnoměrně. -
coalesce()snižuje pouze oddíly a minimalizuje přesun dat.
Kombinace obou přístupů funguje dobře pro scénáře s vysokou propustností. Následující příklad rozdělí data do 48 oddílů v paměti (odpovídajících dostupných jader procesoru) a potom rozdělí oddíly na disku podle dvou sloupců:
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.repartition(48)
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.toTable("deltaeventstable")
)
Použití optimalizovaného zápisu
Jako alternativu k ručnímu dělení optimalizovaný zápis před zápisem sloučí nebo rozdělí oddíly, čímž se maximalizuje propustnost disku bez ručních repartition() nebo coalesce() volání. Povolte ji pomocí konfigurace Sparku:
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
S povoleným optimalizovaným zápisem můžete odebrat repartition() nebo coalesce() z kódu a nechat Spark zpracovávat změnu velikosti oddílů. Stále můžete použít partitionBy() pro organizaci na úrovni disku.
Dávkové události se spouštěči
Pokud chcete dále optimalizovat výkon zápisu, zpracovávejte události v dávkách před jejich zápisem na disk. Spark ve výchozím nastavení zpracovává každý mikrobatch hned po dokončení předchozího. Nastavení intervalu aktivační události shromažďuje data za určité časové období a zapisuje je do menšího počtu větších operací. Větší dávky vytvářejí větší soubory Delta a snižují režijní náklady na malé soubory.
Následující příklad zpracovává události v minutových intervalech:
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.trigger(processingTime="1 minute")
.toTable("deltaeventstable")
)
Analyzujte objem příchozích dat a zvolte interval zpracování, který vytváří dobře velké soubory Parquet v tabulce Delta.
Spouštění úloh streamování v produkčním prostředí
Poznámkové bloky Spark jsou účinným nástrojem pro vývoj a testování logiky streamování. Pro produkční úlohy, které je potřeba spouštět nepřetržitě, ale místo toho použijte definice úloh Sparku. Definice úloh Sparku jsou neinteraktivní úlohy orientované na kód, které běží v clusteru Spark a poskytují větší odolnost a dostupnost.
Infrastruktura, na které běží úloha streamování, může narazit na problémy, které zastavují úlohu, jako jsou selhání hardwaru nebo opravy infrastruktury. Zásada opakování automaticky restartuje úlohu, když se neočekávaně zastaví. Nakonfigurujte zásadu opakování v definici úlohy Sparku, abyste určili, kolikrát se má úloha restartovat (až do nekonečného opakování) a časový interval mezi opakovanými pokusy. Když je zásada opakování povolená, vaše úloha streamování bude dál běžet, dokud ji explicitně nezastavíte.
Centrum monitorování prostředí obsahuje kartu Structured Streaming s metrikami, včetně vstupní rychlosti, rychlosti zpracování, vstupních řádků, doby trvání dávky a doby trvání operace.