Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tato stránka popisuje základní koncepty vodoznaků a poskytuje doporučení k používání vodoznaků v běžných stavových operacích streamování. Vodoznaky musíte použít u stavových operací streamování, abyste se vyhnuli nekonečnému rozšíření objemu dat uchovávaného ve stavu, což může vést k problémům s pamětí nebo ke zvýšení latencí zpracování během dlouhotrvajících operací streamování.
Co je vodoznak?
Strukturované streamování používá časové značky k určování prahové hodnoty, jak dlouho se mají aktualizace pro určenou stavovou entitu dál zpracovávat. Mezi běžné příklady stavových entit patří:
- Agregace v časovém intervalu
- Jedinečné klíče v propojení mezi dvěma datovými proudy
Když deklarujete vodoznak, zadáte pole časového razítka a prahovou hodnotu vodoznaku v streamovacím datovém rámci. Při příchodu nových dat správce stavu sleduje poslední časové razítko v zadaném poli a zpracuje všechny záznamy v rámci prahové hodnoty zpoždění.
Následující příklad použije prahovou hodnotu meze 10 minut na počet oken:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
V tomto příkladu:
- Sloupec
event_timeslouží k definování 10minutového vodoznaku a 5minutového přeskakujícího okna. - Počet se shromažďuje pro každý
idpozorovaný během nepřekrývajících se 5minutových intervalů. - Informace o stavu se uchovávají pro každý počet, dokud konec okna není o 10 minut starší než nejnovější pozorované
event_time.
Důležité
Prahové hodnoty meze zaručují, že se záznamy přicházející do zadané prahové hodnoty zpracovávají podle sémantiky definovaného dotazu. Zpožděné příchozí záznamy přicházející mimo zadanou prahovou hodnotu se stále můžou zpracovávat pomocí metrik dotazů, ale to není zaručené.
Důležité
Běžně se odkazuje na sloupec jako na pole datového rámce. Ale v groupBy() a window() operaci může dojít k odkazování na sloupec bez značky času události. Pokud se tomu chcete vyhnout, použijte "<colName>", col("<colName>")nebo $colName v jazyce Scala.
Jaký vliv mají vodoznaky na dobu zpracování a propustnost?
Vodoznaky komunikují s výstupními režimy a řídí, kdy se data zapisují do jímky. Vzhledem k tomu, že vodoznaky snižují celkové množství informací o stavu, které se mají zpracovat, je efektivní použití vodoznaků nezbytné pro efektivní propustnost stavového streamování.
Poznámka:
Pro všechny stavové operace se nepodporují všechny režimy výstupu.
Vodoznaky a výstupní režim pro okenní agregace
Následující tabulka podrobně popisuje zpracování dotazů s agregací u časového razítka s definovaným vodoznakem:
| Výstupní režim | Chování |
|---|---|
| Připojit | Po překročení prahové hodnoty se řádky zapisují do cílové tabulky. Všechny zápisy jsou zpožděné na základě prahové hodnoty zpoždění. Starý stav agregace se po uplynutí prahové hodnoty zahodí. |
| Aktualizace | Řádky se zapisují do cílové tabulky při výpočtu výsledků a dají se aktualizovat a přepsat při příchodu nových dat. Starý stav agregace se po uplynutí prahové hodnoty zahodí. |
| Dokončit | Stav agregace se nezahodí. Cílová tabulka se přepíše s každou aktivační událostí. |
Vodoznaky a výstup pro propojování stream-stream
Spojení mezi více datovými proudy podporují pouze režim připojování a odpovídající záznamy se zapisují v každé dávce, kdy jsou zjištěny. U vnitřních spojení doporučuje Databricks nastavit prahovou hodnotu meze u každého streamovaného zdroje dat. To umožňuje odstranit informace o stavu pro staré záznamy. Bez vodoznaků se strukturované streamování pokusí spojit každý klíč z obou stran spojení s každým triggerem.
Strukturované streamování má speciální sémantiku pro podporu vnějších spojení. Vodoznak je povinný pro vnější spojení, protože označuje, kdy musí být klíč zapsán s hodnotou null, pokud se nezhoduje. I když vnější spojení mohou být užitečná pro zaznamenání záznamů, které se během zpracování dat nikdy neshodují, protože spojení zapisují do tabulek pouze jako připojené operace, tato chybějící data se nezaznamenávají, dokud není překročena prahová hodnota zpoždění.
Řízení prahové hodnoty pozdních dat pomocí více zásad vodoznaku ve strukturovaném streamování
Při práci s více vstupy strukturovaného streamování můžete nastavit více vodoznaků pro řízení prahových hodnot tolerance pro pozdní příchozí data. Konfigurace vodoznaků umožňuje řídit informace o stavu a ovlivnit latenci.
Streamovací dotaz může mít více vstupních datových proudů, které jsou sjednocovány nebo spojeny dohromady. Každý ze vstupních datových proudů může mít jinou prahovou hodnotu pozdních dat, která je potřeba tolerovat pro stavové operace. Zadejte tyto prahové hodnoty pomocí withWatermarks("eventTime", delay) na každém ze vstupních datových proudů. Následuje příklad dotazu s spojeními stream-stream.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Při spouštění dotazu strukturované streamování jednotlivě sleduje maximální dobu události zobrazenou v každém vstupním datovém proudu, vypočítá vodoznaky na základě odpovídajícího zpoždění a zvolí jeden globální vodoznak, který se má použít pro stavové operace. Ve výchozím nastavení je jako globální vodoznak zvoleno to minimum, protože zajišťuje, že data nebudou náhodně považována za příliš pozdní, pokud některý z datových proudů zaostává za ostatními (například když jeden z datových proudů přestane přijímat data kvůli selhání v nadřazeném systému). Jinými slovy, globální vodoznak se bezpečně pohybuje rychlostí nejpomalejšího datového proudu a výstup dotazu se odpovídajícím způsobem zpozdí.
Pokud chcete získat rychlejší výsledky, můžete nastavit zásadu více vodoznaků tak, aby jako globální vodoznak zvolila maximální hodnotu tak, že nastavíte spark.sql.streaming.multipleWatermarkPolicy konfigurace SQL na max (výchozí hodnota je min). Díky tomu se globální vodoznak pohybuje tempem nejrychlejšího datového proudu. Tato konfigurace však zahodí data z nejpomalejších datových proudů. Databricks doporučuje používat tuto konfiguraci uvážlivě.
Použití vodoznaků u jedinečných operací
Operace distinct je stavový operátor, který vyžaduje watermarky, aby se zabránilo nevázanému růstu stavu. Bez vodoznaků se strukturované streamování pokusí sledovat každý jedinečný záznam po neomezenou dobu, což může vést k problémům s pamětí nebo vyšší latenci zpracování.
Pokud použijete distinct na streamovaný datový rámec, musíte zadat vodoznak v poli časového razítka. Vodoznak určuje, jak dlouho správce stavů vede záznamy pro odstranění duplicit. Po uplynutí prahové hodnoty se staré záznamy odeberou ze stavu systému.
Následující příklad použije vodoznak na distinct operaci:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
V tomto příkladu se z datového proudu odeberou duplicitní záznamy přicházející do 1 hodiny od posledního pozorování eventTime . Informace o stavu odstranění duplicitních dat se po uplynutí prahové hodnoty zahodí.
Důležité
Pokud potřebujete odstranit duplicitní data u konkrétních sloupců místo všech sloupců, použijte dropDuplicates() nebo dropDuplicatesWithinWatermark() místo distinct. Podrobnosti naleznete v další části.
Přetažení duplicit v rámci vodoznaku
V Databricks Runtime 13.3 LTS nebo novějším můžete odstranit duplicitní záznamy v rámci prahové hodnoty časového filtru pomocí jedinečného identifikátoru.
Strukturované streamování poskytuje záruky přesně jednoho zpracování, ale automaticky neodstraňuje záznamy ze zdrojů dat. Můžete použít dropDuplicatesWithinWatermark k odstranění duplicitních dat u libovolného zadaného pole, což vám umožní odebrat duplicity ze streamu i v případě, že se některá pole liší (například čas události nebo čas příjezdu).
U duplicitních záznamů, které přicházejí do zadaného časového limitu, je garantováno, že budou zahozena. Tato záruka je striktní pouze v jednom směru a duplicitní záznamy, které přicházejí mimo zadanou prahovou hodnotu, mohou být také vyřazeny. Pokud chcete odebrat všechny duplicity, musíte nastavit prahovou hodnotu zpoždění vodoznaku delší než maximální rozdíly časových razítek mezi duplicitními událostmi.
Musíte zadat vodoznak, který použije metodu dropDuplicatesWithinWatermark, jak je znázorněno v následujícím příkladu:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))