Získání streamovaných dat do lakehouse se strukturovaným streamováním Sparku

Strukturované streamování je škálovatelný modul pro zpracování datových proudů odolný proti chybám založený na Sparku. Spark se postará o přírůstkové spuštění operace streamování a průběžného doručení dat.

Strukturované streamování bylo dostupné ve Sparku 2.2. Od té doby se jedná o doporučený přístup pro streamování dat. Základním principem strukturovaného datového proudu je zacházet s živým datovým proudem jako s tabulkou, kde se nová data vždy neustále připojují, jako je nový řádek v tabulce. Existuje několik definovaných předdefinovaných zdrojů streamovaných souborů, jako jsou CSV, JSON, ORC, Parquet a integrovaná podpora pro služby zasílání zpráv, jako je Kafka a Event Hubs.

Tento článek obsahuje přehled o tom, jak optimalizovat zpracování a příjem událostí prostřednictvím streamování struktury Sparku v produkčních prostředích s vysokou propustností. Mezi navrhované přístupy patří:

  • Optimalizace propustnosti streamování dat
  • Optimalizace operací zápisu v tabulce Delta a
  • Dávkování událostí

Definice úloh Sparku a poznámkové bloky Sparku

Poznámkové bloky Spark jsou vynikajícím nástrojem pro ověřování nápadů a provádění experimentů, které vám pomůžou získat přehledy z dat nebo kódu. Poznámkové bloky se běžně používají při přípravě dat, vizualizaci, strojovém učení a dalších scénářích s velkými objemy dat. Definice úloh Sparku jsou neinteraktivní úlohy orientované na kód spuštěné v clusteru Spark po dlouhou dobu. Definice úloh Sparku poskytují odolnost a dostupnost.

Poznámkové bloky Spark jsou skvělým zdrojem k otestování logiky kódu a řešení všech obchodních požadavků. Pro zachování provozu v produkčním scénáři jsou nejlepší řešením definice úloh Sparku s povolenou zásadou opakování.

Zásady opakování pro definice úloh Sparku

V Microsoft Fabric může uživatel nastavit zásady opakování pro úlohy definice úloh Sparku. I když může být skript v úloze nekonečný, může dojít k problému s tím, že infrastruktura, na které se skript spouští, vyžaduje zastavení úlohy. Nebo může být úloha odstraněna kvůli potřebám oprav základní infrastruktury. Zásady opakování umožňují uživateli nastavit pravidla pro automatické restartování úlohy, pokud se zastaví kvůli jakýmkoli podkladovým problémům. Parametry určují, jak často se má úloha restartovat, až do nekonečného opakování a nastavit čas mezi opakovanými pokusy. Uživatelé tak můžou zajistit, aby úlohy definice úloh Sparku pokračovaly v nekonečném provozu, dokud se uživatel nerozhodne je zastavit.

Zdroje streamování

Nastavení streamování pomocí služby Event Hubs vyžaduje základní konfiguraci, která zahrnuje název oboru názvů služby Event Hubs, název centra, název sdíleného přístupového klíče a skupinu příjemců. Skupina příjemců je zobrazení celého centra událostí. Umožňuje více aplikacím, které využívají, mít samostatné zobrazení datového proudu událostí a číst stream nezávisle na svém vlastním tempu a s jejich posuny.

Oddíly jsou základní součástí toho, jak zvládnout velký objem dat. Jeden procesor má omezenou kapacitu pro zpracování událostí za sekundu, zatímco několik procesorů může při paralelním spuštění provádět lepší úlohu. Oddíly umožňují paralelně zpracovávat velké objemy událostí.

Pokud se používá příliš mnoho oddílů s nízkou mírou příjmu dat, čtenáři oddílů pracují s malou částí těchto dat, což způsobuje neoptimální zpracování. Ideální počet oddílů přímo závisí na požadované rychlosti zpracování. Pokud chcete škálovat zpracování událostí, zvažte přidání dalších oddílů. Pro oddíl neexistuje žádný konkrétní limit propustnosti. Agregovaná propustnost v oboru názvů je ale omezená počtem jednotek propustnosti. Když zvýšíte počet jednotek propustnosti ve vašem oboru názvů, můžete chtít další oddíly, které umožní souběžným čtenářům dosáhnout maximální propustnosti.

Doporučujeme prozkoumat a otestovat nejlepší počet oddílů pro váš scénář propustnosti. Je ale běžné vidět scénáře s vysokou propustností s využitím 32 nebo více oddílů.

Služba Azure Event Hubs Připojení or pro Apache Spark (azure-event-hubs-spark) se doporučuje připojit aplikaci Spark ke službě Azure Event Hubs.

Lakehouse jako jímka streamování

Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID (atomicity, konzistence, izolace a stálosti) nad řešeními data Lake Storage. Delta Lake také podporuje škálovatelné zpracování metadat, vývoj schématu, časové cestování (správu verzí dat), otevřený formát a další funkce.

V prostředcích infrastruktury Datoví technici se Delta Lake používá k:

  • Snadné přenesení dat (vložení/aktualizace) a odstranění dat pomocí Spark SQL
  • Komprimujte data, abyste minimalizovali čas strávený dotazováním dat.
  • Zobrazte stav tabulek před a po provedení operací.
  • Načtěte historii operací prováděných s tabulkami.

Delta se přidá jako jeden z možných výstupních formátů jímek použitých v writeStream. Další informace o existujících výstupních jímkách najdete v průvodci programováním strukturovaného streamování Sparku.

Následující příklad ukazuje, jak je možné streamovat data do Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Informace o vyříznutí kódu v příkladu:

  • format() je instrukce, která definuje výstupní formát dat.
  • outputMode() definuje, jakým způsobem se nové řádky ve streamování zapisují (tj. připojovat, přepsat).
  • toTable() zachová streamovaná data do tabulky Delta vytvořené pomocí hodnoty předané jako parametr.

Optimalizace zápisů Delta

Dělení dat je důležitou součástí vytváření robustního řešení streamování: dělení zlepšuje způsob uspořádání dat a také zvyšuje propustnost. Soubory se snadno fragmentují po operacích Delta, což vede k příliš velkému počtu malých souborů. A příliš velké soubory jsou také problém, a to kvůli dlouhé době jejich zápisu na disk. Výzvou při dělení dat je nalezení správné rovnováhy, která vede k optimálním velikostem souborů. Spark podporuje dělení v paměti a na disku. Správně dělená data můžou poskytovat nejlepší výkon při zachování dat do Delta Lake a dotazování dat z Delta Lake.

  • Při dělení dat na disku můžete zvolit způsob rozdělení dat na základě sloupců pomocí partitionBy(). partitionBy() je funkce použitá k rozdělení velkého sémantického modelu do menších souborů na základě jednoho nebo více sloupců zadaných při zápisu na disk. Dělení je způsob, jak zlepšit výkon dotazů při práci s velkým sémantickým modelem. Vyhněte se výběru sloupce, který generuje příliš malé nebo příliš velké oddíly. Definujte oddíl založený na sadě sloupců s dobrou kardinalitou a rozdělte data do souborů optimální velikosti.
  • Dělení dat v paměti je možné provádět pomocí transformací repartition() nebo coalesce() a distribuce dat na více pracovních uzlech a vytváření více úloh, které mohou číst a zpracovávat data paralelně pomocí základů odolné distribuované datové sady (RDD). Umožňuje rozdělit sémantický model na logické oddíly, které je možné vypočítat na různých uzlech clusteru.
    • funkce repartition() slouží ke zvýšení nebo snížení počtu oddílů v paměti. Změna rozdělení předěluje přes síť celá data a vyrovnává je napříč všemi oddíly.
    • funkce coalesce() slouží pouze k efektivnímu snížení počtu oddílů. To je optimalizovaná verze repartition(), kde přesun dat napříč všemi oddíly je nižší pomocí funkce coalesce().

Kombinace obou přístupů k dělení je dobrým řešením ve scénáři s vysokou propustností. repartition() vytvoří určitý počet oddílů v paměti, zatímco partitionBy() zapisuje soubory na disk pro každý oddíl paměti a sloupec dělení. Následující příklad znázorňuje použití obou strategií dělení na oddíly ve stejné úloze Sparku: data se nejprve rozdělí do 48 oddílů v paměti (za předpokladu, že máme celkem 48 jader procesoru) a potom jsou rozdělena na disk na základě dvou existujících sloupců v datové části.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Optimalizovaný zápis

Další možností optimalizace zápisů do Delta Lake je použití optimalizovaného zápisu. Optimalizovaný zápis je volitelná funkce, která zlepšuje způsob zápisu dat do tabulky Delta. Spark sloučí nebo rozdělí oddíly před zápisem dat a maximalizuje propustnost zapisovaných dat na disk. V některých úlohách ale dochází k úplnému náhodnému náhodnému prohazování, takže u některých úloh může dojít ke snížení výkonu. Úlohy využívající funkce coalesce() a/nebo repartition() pro dělení dat na disku je možné refaktorovat, aby místo toho začaly používat optimalizované zápisy.

Následující kód je příkladem použití optimalizovaného zápisu. Všimněte si, že partitionBy() se stále používá.

spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Dávkové události

Aby se minimalizoval počet operací, aby se zlepšil čas strávený ingestováním dat do Delta Lake, je praktickou alternativou dávkové události.

Triggery definují, jak často se má dotaz streamování spouštět (aktivovat) a generovat nová data, jejich nastavení definuje interval pravidelného zpracování pro mikrobatchy, shromažďování dat a dávkové události do několika trvalých operací, místo aby se neustále zapisovaly na disk.

Následující příklad ukazuje streamovací dotaz, ve kterém se události pravidelně zpracovávají v intervalech jedné minuty.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

Výhodou kombinování dávkování událostí v operacích zápisu tabulek Delta je, že vytváří větší soubory Delta s více daty v nich, aby nedocházelo k malým souborům. Měli byste analyzovat množství přijatých dat a najít nejvhodnější dobu zpracování pro optimalizaci velikosti souborů Parquet vytvořených knihovnou Delta.

Sledování

Spark 3.1 a vyšší verze mají integrované uživatelské rozhraní strukturovaného streamování obsahující následující metriky streamování:

  • Vstupní rychlost
  • Rychlost zpracování
  • Vstupní řádky
  • Doba trvání dávky
  • Doba trvání operace