Události
31. 3. 23 - 2. 4. 23
Největší událost učení Fabric, Power BI a SQL. 31. března – 2. dubna. Pomocí kódu FABINSIDER uložte $400.
Zaregistrovat se ještě dnesTento prohlížeč se už nepodporuje.
Upgradujte na Microsoft Edge, abyste mohli využívat nejnovější funkce, aktualizace zabezpečení a technickou podporu.
Strukturované streamování je škálovatelný modul pro zpracování datových proudů odolný proti chybám založený na Sparku. Spark se stará o přírůstkové a průběžné provádění streamovacích operací, jak data stále přicházejí.
Strukturované streamování bylo dostupné ve Sparku 2.2. Od té doby se jedná o doporučený přístup pro streamování dat. Základním principem strukturovaného datového proudu je zacházet s živým datovým proudem jako s tabulkou, kde se nová data vždy neustále připojují, jako je nový řádek v tabulce. Existuje několik definovaných předdefinovaných zdrojů streamovaných souborů, jako jsou CSV, JSON, ORC, Parquet a integrovaná podpora pro služby zasílání zpráv, jako je Kafka a Event Hubs.
Tento článek obsahuje přehled o tom, jak optimalizovat zpracování a příjem událostí prostřednictvím strukturovaného streamování Sparku v produkčních prostředích s vysokou propustností. Mezi navrhované přístupy patří:
Poznámkové bloky Spark jsou vynikajícím nástrojem pro ověřování nápadů a provádění experimentů, které vám pomůžou získat přehledy z dat nebo kódu. Poznámkové bloky se běžně používají při přípravě dat, vizualizaci, strojovém učení a dalších scénářích s velkými objemy dat. Definice úloh Sparku jsou neinteraktivní úlohy orientované na kód spuštěné v clusteru Spark po dlouhou dobu. Definice úloh Sparku poskytují odolnost a dostupnost.
Poznámkové bloky Spark jsou skvělým zdrojem k otestování logiky kódu a řešení všech obchodních požadavků. Pro zachování provozu v produkčním scénáři jsou nejlepší řešením definice úloh Sparku s povolenou zásadou opakování.
V Microsoft Fabric může uživatel nastavit zásady opakování pro úlohy definice úloh Sparku. I když může být skript v úloze nekonečný, může dojít k problému s tím, že infrastruktura, na které se skript spouští, vyžaduje zastavení úlohy. Nebo může být úloha odstraněna kvůli potřebám oprav základní infrastruktury. Zásady opakování umožňují uživateli nastavit pravidla pro automatické restartování úlohy, pokud se zastaví kvůli jakýmkoli podkladovým problémům. Parametry určují, jak často se má úloha restartovat, až do nekonečného opakování a nastavit čas mezi opakovanými pokusy. Uživatelé tak můžou zajistit, aby úlohy definice úloh Sparku pokračovaly v nekonečném provozu, dokud se uživatel nerozhodne je zastavit.
Nastavení streamování pomocí služby Event Hubs vyžaduje základní konfiguraci, která zahrnuje název oboru názvů služby Event Hubs, název centra, název sdíleného přístupového klíče a skupinu příjemců. Spotřebitelská skupina je pohledem na celé centrum událostí. Umožňuje více koncovým aplikacím mít samostatné zobrazení streamu událostí a nezávisle číst stream vlastním tempem a s vlastními posuny.
Oddíly jsou základní součástí toho, jak zvládnout velký objem dat. Jeden procesor má omezenou kapacitu pro zpracování událostí za sekundu, zatímco několik procesorů může při paralelním spuštění provádět lepší úlohu. Oddíly umožňují paralelně zpracovávat velké objemy událostí.
Pokud se používá příliš mnoho oddílů s nízkou mírou příjmu dat, čtenáři oddílů pracují s malou částí těchto dat, což způsobuje neoptimální zpracování. Ideální počet oddílů přímo závisí na požadované rychlosti zpracování. Pokud chcete škálovat zpracování událostí, zvažte přidání dalších oddílů. Pro oddíl neexistuje žádný konkrétní limit propustnosti. Avšak agregovanou propustnost ve vašem oboru názvů omezuje počet jednotek propustnosti. Když navýšíte počet jednotek propustnosti ve svém oboru názvů, můžete chtít další oddíly tak, aby umožnily souběžným čtenářům dosáhnout maximální propustnosti.
Doporučujeme prozkoumat a odzkoušet nejlepší počet oddílů pro váš scénář s ohledem na propustnost. Je ale běžné vidět scénáře s vysokou propustností s využitím 32 nebo více oddílů.
Konektor Azure Event Hubs pro Apache Spark (azure-event-hubs-spark) se doporučuje připojit aplikaci Spark ke službě Azure Event Hubs.
Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID (atomicity, konzistence, izolace a stálosti) nad řešeními data Lake Storage. Delta Lake také podporuje škálovatelné zpracování metadat, vývoj schématu, časové cestování (správu verzí dat), otevřený formát a další funkce.
V datovém inženýrství se Delta Lake používá k:
Delta je přidána jako jeden z možných formátů výstupních jímek používaných ve writeStream. Další informace o existujících výstupních jímkách najdete v průvodci programováním strukturovaného streamování Sparku.
Následující příklad ukazuje, jak je možné streamovat data do Delta Lake.
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
O úryvku kódu v příkladu:
Dělení dat je důležitou součástí vytváření robustního řešení streamování: dělení zlepšuje způsob uspořádání dat a také zvyšuje propustnost. Soubory se snadno fragmentují po operacích Delta, což vede k příliš velkému počtu malých souborů. A příliš velké soubory jsou také problém, a to kvůli dlouhé době jejich zápisu na disk. Výzvou při dělení dat je nalezení správné rovnováhy, která vede k optimálním velikostem souborů. Spark podporuje dělení v paměti a na disku. Správně rozdělená data mohou poskytovat nejlepší výkon při ukládání dat do Delta Lake a dotazování z Delta Lake.
Kombinace obou přístupů k dělení je dobrým řešením ve scénáři s vysokou propustností. repartition() vytvoří určitý počet oddílů v paměti, zatímco partitionBy() zapisuje soubory na disk pro každý oddíl paměti a sloupec dělení. Následující příklad znázorňuje použití obou strategií dělení na oddíly ve stejné úloze Sparku: data se nejprve rozdělí do 48 oddílů v paměti (za předpokladu, že máme celkem 48 jader procesoru) a potom jsou rozdělena na disk na základě dvou existujících sloupců v datové části.
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Další možností optimalizace zápisů do Delta Lake je použití optimalizovaného zápisu. Optimalizovaný zápis je volitelná funkce, která zlepšuje způsob zápisu dat do tabulky Delta. Spark sloučí nebo rozdělí oddíly před zápisem dat a maximalizuje propustnost zapisovaných dat na disk. V některých úlohách ale dochází k úplnému prohazování, takže může dojít ke snížení výkonu. Úlohy využívající funkce coalesce() a/nebo repartition() pro dělení dat na disku je možné refaktorovat, aby místo toho začaly používat optimalizované zápisy.
Následující kód je příkladem použití optimalizovaného zápisu. Všimněte si, že partitionBy() se stále používá.
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Aby se minimalizoval počet operací, aby se zlepšil čas strávený ingestováním dat do Delta Lake, je praktickou alternativou dávkové události.
Triggery definují, jak často se má dotaz streamování spouštět (aktivovat) a generovat nová data. Jejich nastavení definuje interval pravidelného zpracování mikrodávek, shromažďování dat a hromadění událostí vytvářením několika trvalých operací místo neustálého zápisu na disk.
Následující příklad ukazuje streamovací dotaz, ve kterém se události pravidelně zpracovávají v intervalech jedné minuty.
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
Výhodou kombinování dávkování událostí v operacích zápisu tabulek Delta je, že vytváří větší soubory Delta obsahující více dat, a vyhlo se tak tvorbě malých souborů. Měli byste analyzovat množství přijatých dat a najít nejvhodnější dobu zpracování pro optimalizaci velikosti souborů Parquet vytvořených knihovnou Delta.
Spark 3.1 a vyšší verze mají integrované uživatelské rozhraní strukturovaného streamování obsahující následující metriky streamování:
Události
31. 3. 23 - 2. 4. 23
Největší událost učení Fabric, Power BI a SQL. 31. března – 2. dubna. Pomocí kódu FABINSIDER uložte $400.
Zaregistrovat se ještě dnesŠkolení
Modul
Provádění přírůstkového zpracování pomocí strukturovaného streamování Sparku - Training
Informace o strukturovaném streamování Sparku a způsobech jeho optimalizace a použití k naplnění cílových objektů
Certifikace
Microsoft Certified: Datový inženýr Fabric Associate - Certifications
Jako datový inženýr infrastruktury byste měli mít zkušenosti se vzory načítání dat, architekturami dat a procesy orchestrace.
Dokumentace
Přidejte lakehouse jako cíl pro eventstream - Microsoft Fabric
Zjistěte, jak přidat cíl typu lakehouse do toku událostí v rámci Microsoft Fabric.
Možnosti pro načtení dat do Lakehouse - Microsoft Fabric
Zjistěte, jak načíst data do jezera prostřednictvím nahrávání souborů, knihoven Apache Sparku v kódu poznámkového bloku a nástroje pro kopírování v kanálech.
Schémata Lakehouse (Preview) - Microsoft Fabric
Co jsou schémata lakehouse a jak ho používat