Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Az állapotban tárolt adatok hatékony kezeléséhez használjon vízjeleket, amikor állapotalapú streamfeldolgozást végez a Lakeflow Spark Deklaratív folyamatokban, beleértve az aggregációkat, illesztéseket és deduplikációkat. Ez a cikk bemutatja, hogyan használhat vízjeleket a folyamat lekérdezéseiben, és példákat tartalmaz az ajánlott műveletekre.
Megjegyzés:
Annak érdekében, hogy az aggregációkat végrehajtó lekérdezések növekményesen legyenek feldolgozva, és ne legyenek teljesen újrafordítve az egyes frissítésekkel, vízjeleket kell használnia.
Mi az a vízjel?
A streamfeldolgozás során a vízjel egy Apache Spark-funkció, amely az állapotalapú műveletek, például összesítések végrehajtásakor meghatározhat egy időalapú küszöbértéket az adatok feldolgozásához. Az érkező adatok feldolgozása addig történik, amíg el nem éri a küszöbértéket, és ekkor a küszöbérték által meghatározott időablak bezárul. A vízjelekkel elkerülhetők a lekérdezések feldolgozása során felmerülő problémák, főként nagyobb adathalmazok vagy hosszú ideig futó feldolgozás esetén. Ezek a problémák magukban foglalhatják az eredmények előállításának magas késését, és akár a memóriatúlcsordulásos (OOM) hibákat is, mivel a feldolgozás során tárolt állapot adatok mennyisége miatt. Mivel a streamelési adatok eredendően rendezetlenek, a vízjelek támogatják az olyan műveletek helyes kiszámítását is, mint az időablak-összesítések.
A vízjelek streamfeldolgozásban való használatáról további információt az Apache Spark strukturált streamelési és vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozásához című témakörben talál.
Hogyan definiálhat vízjelet?
A vízjel meghatározásához meg kell adnia egy időbélyegmezőt és egy olyan értéket, amely a késői adatok beérkezési időküszöbét jelöli. Az adatok késésnek minősülnek, ha a megadott időküszöb után érkeznek. Ha például a küszöbérték 10 perc, a 10 perces küszöbérték után érkező rekordok elvethetők.
Mivel a megadott küszöbérték után érkező rekordok elvethetők, fontos a késési és a helyességi követelményeknek megfelelő küszöbérték kiválasztása. Ha kisebb küszöbértéket választ, a rekordok hamarabb lesznek kibocsátva, de azt is jelenti, hogy a késői rekordok nagyobb valószínűséggel lesznek elvetve. A nagyobb küszöbérték hosszabb várakozást, de az adatok teljességét is jelentheti. A nagyobb állapotméret miatt a nagyobb küszöbértékek további számítási erőforrásokat is igényelhetnek. Mivel a küszöbérték az adatoktól és a feldolgozási követelményektől függ, a feldolgozás tesztelése és monitorozása fontos az optimális küszöbérték meghatározásához.
A Pythonban a withWatermark() függvény használatával definiálhat vízjelet. Az SQL-ben a WATERMARK záradék használatával definiáljon vízjelet:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Vízjelek használata stream-stream illesztésekkel
Stream stream illesztések esetén meg kell adni egy vízjelet mindkét oldalon, valamint egy időintervallum záradékot. Mivel minden illesztési forrás hiányosan tekinti meg az adatokat, az időintervallum záradék szükséges ahhoz, hogy tájékoztassa a streamelési motort, ha nem lehet további egyezéseket létrehozni. Az időintervallum záradéknak ugyanazokat a mezőket kell használnia, mint a vízjelek definiálásához.
Mivel előfordulhat, hogy az egyes streamek különböző küszöbértékeket igényelnek a vízjelekhez, a streameknek nem kell azonos küszöbértékekkel rendelkezniük. A hiányzó adatok elkerülése érdekében a streamelési motor egy globális vízjelet tart fenn a leglassabb stream alapján.
Az alábbi példa egy hirdetésmegjelenések streamjéhez kapcsolódik, valamint egy használói kattintások streamje a hirdetésekre. Ebben a példában a kattintásnak a megjelenést követő 3 percen belül kell történnie. A 3 perces időintervallum leteltét követően a már nem egyeztethető állapot sorait a rendszer elveti.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Ablakos összesítések végrehajtása vízjelekkel
A streamelésből származó adatok egyik gyakori állapottal rendelkező művelete az ablakos összesítés. Az ablakos aggregációk hasonlóak a csoportosított aggregációkhoz, azzal a kivételrel, hogy a megadott ablak részét képező sorok összesítő értékei lesznek visszaadva.
Egy ablak definiálható egy adott hosszként, és az összesítési művelet az adott ablak részét képező összes sorban is végrehajtható. A Spark Streaming három ablaktípust támogat:
- Gördülő (rögzített) ablakok: Rögzített méretű, nem átfedésben lévő és egybefüggő időintervallumok sorozata. Egy bemeneti rekord csak egyetlen ablakhoz tartozik.
- Tolóablakok: Az ugróablakokhoz hasonlóan a tolóablakok rögzített méretűek, de az ablakok átfedésben lehetnek, és egy rekord több ablakba is tartozhat.
Amikor az adatok az ablak vége után, a vízjel hosszával növelt időtartamon túl érkeznek, az ablakhoz nem fogad el új adatokat a rendszer, az összegzés eredménye kibocsátásra kerül, és az ablak állapota törlődik.
Az alábbi példa egy rögzített ablak használatával 5 percenként kiszámítja a megjelenítések összegét. Ebben a példában a kijelölési záradék az aliast impressions_windowhasználja, majd maga az ablak lesz definiálva a GROUP BY záradék részeként. Az ablaknak ugyanazon időbélyegoszlopon kell alapulnia, mint a vízjelnek, az ebben a clickTimestamp példában szereplő oszlopnak.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Hasonló példa a Pythonban: nyereség kiszámítása óránkénti fix időablakok fölött.
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Streamelési rekordok deduplikálása
A strukturált streamelés pontosan egyszeri feldolgozási garanciával rendelkezik, de nem törli automatikusan az adatforrásokból származó rekordok duplikálását. Mivel például számos üzenetsor legalább egyszer garantálja a garanciát, ismétlődő rekordokra kell számítani az üzenetsorok egyikéből való olvasáskor. A függvény használatával dropDuplicatesWithinWatermark() bármely megadott mező rekordjait eltávolíthatja, és eltávolíthatja az ismétlődéseket a streamből, még akkor is, ha egyes mezők eltérnek (például az esemény ideje vagy az érkezési idő). Meg kell adnia egy vízjelet a dropDuplicatesWithinWatermark() függvény használatához. A vízjel által megadott időtartományon belül érkező összes duplikált adat elvetve lesz.
A rendezett adatok azért fontosak, mert a rendezetlen adatok miatt a vízjel értéke helytelenül ugrik előre. Ezután, amikor régebbi adatok érkeznek, későinek és elvetettnek minősülnek. Ezzel a withEventTimeOrder beállítással a kezdeti pillanatképet a vízjelben megadott időbélyeg alapján, sorrendben dolgozhatja fel. A withEventTimeOrder beállítás deklarálható az adathalmazt meghatározó kódban vagy a folyamatbeállítások használatával spark.databricks.delta.withEventTimeOrder.enabled. Például:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Megjegyzés:
A withEventTimeOrder beállítás csak Python esetén támogatott.
A következő példában az adatokat a rendszer clickTimestamp szerint rendezi sorba, és az egymástól 5 másodpercen belül érkező, duplikált userId és clickAdId oszlopokat tartalmazó rekordokat eldobja.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Folyamatkonfiguráció optimalizálása állapotalapú feldolgozáshoz
Az gyártási problémák és a túlzott késleltetés elkerülése érdekében a Databricks javasolja, hogy engedélyezze a RocksDB-alapú állapotkezelést az állapotalapú streamfeldolgozáshoz, különösen ha szükség van nagymennyiségű köztes állapot mentésére.
A kiszolgáló nélküli folyamatok automatikusan kezelik az állapottároló konfigurációit.
A RocksDB-alapú állapotkezelés engedélyezéséhez állítsa be a következő konfigurációt a folyamat üzembe helyezése előtt:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Ha többet szeretne megtudni a RocksDB állapottárolójáról, beleértve a RocksDB konfigurációs javaslatait, olvassa el a RocksDB-állapottároló konfigurálása az Azure Databricksben című témakört.