Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Pokud chcete efektivně spravovat data uložená ve stavu, použijte vodoznaky při provádění stavového zpracování datových proudů v deklarativních kanálech Lakeflow Sparku, včetně agregací, spojení a odstranění duplicitních dat. Tento článek popisuje, jak používat vodoznaky v dotazech datového potrubí a obsahuje příklady doporučených operací.
Poznámka:
Aby se zajistilo, že dotazy, které provádějí agregace, se zpracovávají přírůstkově a nejsou plně přepočítané při každé aktualizaci, musíte použít vodoznaky.
Co je vodoznak?
Při zpracování datových proudů je vodoznak funkcí Apache Sparku, která může definovat prahovou hodnotu založenou na čase pro zpracování dat při provádění stavových operací, jako jsou agregace. Příchozí data se zpracovávají, dokud nedosáhnete prahové hodnoty. V tomto okamžiku je časové období definované prahovou hodnotou uzavřeno. Vodoznaky se dají použít k zabránění problémům při zpracování dotazů, zejména při zpracování větších datových sad nebo dlouhotrvajícího zpracování. Tyto problémy můžou zahrnovat vysokou latenci při vytváření výsledků a dokonce i chyby typu nedostatek paměti (OOM) kvůli množství dat, která se během zpracování uchovávají ve stavu. Vzhledem k tomu, že streamovaná data jsou ze své podstaty neuspořádaná, vodoznaky také podporují správné výpočty operací, jako jsou agregace časových intervalů.
Další informace o používání vodoznaků při zpracování datových proudů najdete v tématu Vodoznaky ve strukturovaném streamování Apache Sparku a použití vodoznaků pro řízení prahových hodnot zpracování dat.
Jak definujete vodoznak?
Vodoznak definujete tak, že zadáte pole časového razítka a hodnotu představující prahovou hodnotu času pro pozdní doručení dat . Data se považují za opožděná, pokud dorazí po definované prahové hodnotě času. Pokud je například prahová hodnota definována jako 10 minut, můžou se záznamy přicházející po 10minutové prahové hodnotě vynechat.
Vzhledem k tomu, že záznamy, které přicházejí po definované prahové hodnotě, mohou být vyřazeny, je důležité vybrat prahovou hodnotu, která splňuje vaše latence a požadavky na správnost. Pokud zvolíte menší prahovou hodnotu, vygenerují se záznamy dříve, ale také to znamená, že opožděné záznamy budou pravděpodobně vynechány. Větší prahová hodnota znamená delší čekání, ale možná větší úplnost dat. Kvůli větší velikosti stavu může větší prahová hodnota vyžadovat také další výpočetní prostředky. Vzhledem k tomu, že prahová hodnota závisí na požadavcích na data a zpracování, je důležité určit optimální prahovou hodnotu testováním a monitorováním zpracování.
Pomocí funkce v Pythonu withWatermark() definujete vodoznak. V SQL použijte WATERMARK klauzuli k definování vodoznaku:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Používejte vodoznaky pro spojení stream-stream
U spojení stream-stream musíte definovat vodoznak na obou stranách spojení a klauzuli časového intervalu. Vzhledem k tomu, že každý zdroj spojení obsahuje neúplné zobrazení dat, je nutné, aby klauzule časového intervalu řekla modulu streamování, když není možné provést žádné další shody. Klauzule časového intervalu musí používat stejná pole, která slouží k definování vodoznaků.
Každý datový proud může v určitých případech vyžadovat různé prahové hodnoty pro vodoznaky, takže datové proudy nemusí mít vždy stejné prahové hodnoty. Aby se zabránilo chybějícím datům, modul streamování udržuje jeden globální vodoznak založený na nejpomalejším datovém proudu.
Následující příklad spojuje stream reklamních impresí a stream kliknutí uživatelů na reklamy. V tomto příkladu musí dojít ke kliknutí do 3 minut od zobrazení. Řádky ze stavu, které už nelze shodovat, se odstraní po uplynutí 3minutového časového intervalu.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Provádění agregací ve formě oken pomocí vodoznaků
Běžnou stavovou operací na streamovaných datech je agregace s časovými okny. Agregace s okny jsou podobné seskupeným agregacím s tím rozdílem, že agregační hodnoty se vrátí pro sadu řádků, které jsou součástí definovaného okna.
Okno lze definovat jako určitou délku a operaci agregace lze provést na všech řádcích, které jsou součástí tohoto okna. Streamování Sparku podporuje tři typy oken:
- Posuvná (pevná) okna: Řada časových intervalů s pevnou velikostí, které se nepřekrývají a jsou souvislé. Vstupní záznam patří pouze do jednoho okna.
- Posuvná okna: Podobně jako padající okna jsou posuvná okna pevné velikosti, ale okna se můžou překrývat a záznam může spadat do více oken.
Když data dorazí po skončení okna plus délka vodoznaku, nebudou pro toto okno přijata žádná nová data, výsledek agregace se vygeneruje a stav okna se vymaže.
Následující příklad vypočítá součet impresí každých 5 minut pomocí pevného okna. V tomto příkladu klauzule select používá alias impressions_windowa pak samotné okno je definováno jako součást GROUP BY klauzule. Okno musí být založené na stejném sloupci časového razítka jako vodoznak, tedy na sloupci clickTimestamp v tomto příkladu.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Podobný příklad v Pythonu pro výpočet zisku v pevných hodinových oknech:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Deduplikovat streamované záznamy
Strukturované streamování má záruky na zpracování přesně jednou, ale automaticky nededuplikuje záznamy ze zdrojů dat. Protože například mnoho front zpráv má alespoň jednou záruku, měly by se při čtení z jedné z těchto front zpráv očekávat duplicitní záznamy. Funkci můžete použít dropDuplicatesWithinWatermark() k odstranění duplicitních záznamů u libovolného zadaného pole, odebrání duplicit ze streamu i v případě, že se některá pole liší (například čas události nebo čas příjezdu). Chcete-li použít dropDuplicatesWithinWatermark() funkci, musíte zadat vodoznak. Všechna duplicitní data, která přicházejí v časovém rozsahu určeném vodoznakem, se zahodí.
Seřazená data jsou důležitá, protože data mimo pořadí způsobí nesprávné přeskakování hodnoty meze. Když přijdou starší data, považují se za opožděná a vyřazená. Použijte možnost withEventTimeOrder k zpracování počátečního snímku v pořadí podle časové známky zadané ve vodoznaku. Možnost withEventTimeOrder lze deklarovat v kódu definujícím datovou sadu nebo v nastavení kanálu pomocí spark.databricks.delta.withEventTimeOrder.enabled. Například:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Poznámka:
Tato withEventTimeOrder možnost se podporuje jenom v Pythonu.
V následujícím příkladu se data zpracovávají seřazená podle clickTimestampzáznamů a záznamy přicházející do 5 sekund od sebe, které obsahují duplicitní userId a clickAdId sloupce, se zahodí.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Optimalizace konfigurace pipeline pro stavové zpracování
Aby se zabránilo problémům v produkčním prostředí a nadměrné latenci, doporučuje Databricks povolit správu stavu založené na RocksDB pro zpracování stavových datových proudů, zejména pokud zpracování vyžaduje úsporu velkého zprostředkujícího stavu.
Bezserverové kanály automaticky spravují konfigurace úložiště stavu.
Správu stavu na základě RocksDB můžete povolit nastavením následující konfigurace před nasazením kanálu:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Další informace o úložišti stavů RocksDB, včetně doporučení konfigurace pro RocksDB, najdete v tématu Konfigurace úložiště stavů RocksDB v Azure Databricks.