Použití vodoznaků k řízení prahových hodnot zpracování dat
Tento článek představuje základní koncepty vodoznaků a poskytuje doporučení k používání vodoznaků v běžných stavových operacích streamování. Vodoznaky musíte použít u stavových operací streamování, abyste se vyhnuli nekonečnému rozšíření objemu dat uchovávaného ve stavu, což by mohlo vést k problémům s pamětí a zvýšit latenci zpracování během dlouhotrvajících operací streamování.
Co je vodoznak?
Strukturované streamování používá vodoznaky k řízení prahové hodnoty, jak dlouho se mají aktualizace pro danou entitu stavu dál zpracovávat. Mezi běžné příklady stavových entit patří:
- Agregace v časovém intervalu
- Jedinečné klíče ve spojení mezi dvěma datovými proudy
Když deklarujete vodoznak, zadáte v datovém rámci streamování prahovou hodnotu časového razítka a prahovou hodnotu meze. Při příchodu nových dat správce stavu sleduje poslední časové razítko v zadaném poli a zpracuje všechny záznamy v rámci prahové hodnoty zpoždění.
Následující příklad použije prahovou hodnotu meze 10 minut na počet oken:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
V tomto příkladu:
- Sloupec
event_time
slouží k definování 10minutového vodoznaku a 5minutového přeskakujícího okna. - Pro každý
id
pozorovaný počet se shromažďuje počet pro každé nepřekrývající se 5minutové intervaly. - Informace o stavu se zachovají pro každý počet do konce okna o 10 minut starších než poslední pozorované
event_time
údaje .
Důležité
Prahové hodnoty meze zaručují, že se záznamy přicházející do zadané prahové hodnoty zpracovávají podle sémantiky definovaného dotazu. Zpožděné příchozí záznamy přicházející mimo zadanou prahovou hodnotu se stále můžou zpracovávat pomocí metrik dotazů, ale to není zaručené.
Jaký vliv mají vodoznaky na dobu zpracování a propustnost?
Vodoznaky komunikují s výstupními režimy a řídí, kdy se data zapisují do jímky. Vzhledem k tomu, že vodoznaky snižují celkové množství informací o stavu, které se mají zpracovat, je efektivní použití vodoznaků nezbytné pro efektivní propustnost stavového streamování.
Poznámka:
Pro všechny stavové operace se nepodporují všechny režimy výstupu.
Vodoznaky a výstupní režim pro agregace s okny
Následující tabulka podrobně popisuje zpracování dotazů s agregací u časového razítka s definovaným vodoznakem:
Výstupní režim | Chování |
---|---|
Připojit | Po předání prahové hodnoty meze se řádky zapisují do cílové tabulky. Všechny zápisy jsou zpožděné na základě prahové hodnoty zpoždění. Starý stav agregace se po předání prahové hodnoty zahodí. |
Aktualizovat | Řádky se zapisují do cílové tabulky při výpočtu výsledků a dají se aktualizovat a přepsat při příchodu nových dat. Starý stav agregace se po předání prahové hodnoty zahodí. |
Dokončit | Stav agregace se nezahodí. Cílová tabulka se přepíše s každou aktivační událostí. |
Vodoznaky a výstup pro spojení stream-stream
Spojení mezi více datovými proudy podporují pouze režim připojení a odpovídající záznamy se zapisují v každé dávce, kterou se zjistí. U vnitřních spojení doporučuje Databricks nastavit prahovou hodnotu meze u každého streamovaného zdroje dat. To umožňuje zahodit informace o stavu pro staré záznamy. Bez vodoznaků se strukturované streamování pokusí spojit každý klíč z obou stran spojení s každým triggerem.
Strukturované streamování má speciální sémantiku pro podporu vnějších spojení. Vodoznaky jsou povinné pro vnější spojení, protože označuje, kdy musí být klíč zapsán s hodnotou null po ukončení. Všimněte si, že zatímco vnější spojení mohou být užitečná pro zaznamenávání záznamů, které se během zpracování dat nikdy neshodují, protože spojení zapisují pouze do tabulek jako operace připojení, tato chybějící data se nezaznamenávají, dokud se nezpozdí prahová hodnota zpoždění.
Řízení prahové hodnoty pozdních dat pomocí zásad více vodoznaků ve strukturovaném streamování
Při práci s více vstupy strukturovaného streamování můžete nastavit více vodoznaků pro řízení prahových hodnot tolerance pro pozdní příchozí data. Konfigurace vodoznaků umožňuje řídit informace o stavu a ovlivnit latenci.
Streamovací dotaz může mít více vstupních datových proudů, které jsou sjednocovány nebo spojeny dohromady. Každý ze vstupních datových proudů může mít jinou prahovou hodnotu pozdních dat, která je potřeba tolerovat pro stavové operace. Zadejte tyto prahové hodnoty pro withWatermarks("eventTime", delay)
každý ze vstupních datových proudů. Následuje příklad dotazu s spojeními stream-stream.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Při spouštění dotazu strukturované streamování jednotlivě sleduje maximální dobu události zobrazenou v každém vstupním datovém proudu, vypočítá vodoznaky na základě odpovídajícího zpoždění a zvolí jeden globální vodoznak, který se má použít pro stavové operace. Ve výchozím nastavení se jako globální vodoznak vybere minimum, protože zajistí, že se žádná data omylem nezahodí, pokud některý z datových proudů spadá za ostatními (například jeden z datových proudů přestane přijímat data kvůli nadřazeným selháním). Jinými slovy, globální vodoznak se bezpečně pohybuje rychlostí nejpomalejšího datového proudu a výstup dotazu se odpovídajícím způsobem zpozdí.
Pokud chcete získat rychlejší výsledky, můžete nastavit zásadu více vodoznaků tak, aby jako globální vodoznak zvolila maximální hodnotu tak, že nastavíte konfiguraci spark.sql.streaming.multipleWatermarkPolicy
SQL na max
hodnotu (výchozí hodnota je min
). Díky tomu se globální vodoznak pohybuje tempem nejrychlejšího datového proudu. Tato konfigurace však zahodí data z nejpomalejších datových proudů. Proto databricks doporučuje, abyste tuto konfiguraci používali uvážlivě.
Odstranění duplicit v rámci vodoznaku
Ve službě Databricks Runtime 13.3 LTS a novějších můžete záznamy v rámci prahové hodnoty meze odstranit pomocí jedinečného identifikátoru.
Strukturované streamování poskytuje záruky zpracování přesně jednou, ale neodstraňuje automaticky záznamy ze zdrojů dat. Můžete použít dropDuplicatesWithinWatermark
k odstranění duplicit záznamů u libovolného zadaného pole, což vám umožní odebrat duplicity ze streamu, i když se některá pole liší (například čas události nebo čas příjezdu).
U duplicitních záznamů, které přicházejí do zadaného vodoznaku, je zaručeno, že se zahodí. Tato záruka je striktní pouze v jednom směru a duplicitní záznamy, které přicházejí mimo zadanou prahovou hodnotu, mohou být také vyřazeny. Pokud chcete odebrat všechny duplicity, musíte nastavit prahovou hodnotu zpoždění vodoznaku delší než maximální rozdíly časových razítek mezi duplicitními událostmi.
Chcete-li použít metodu dropDuplicatesWithinWatermark
, musíte zadat vodoznak, jak je znázorněno v následujícím příkladu:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])