Vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozásához
Ez a cikk bemutatja a vízjelezés alapvető fogalmait, és javaslatokat nyújt a vízjelek általános állapotalapú streamelési műveletekben való használatára. Vízjeleket kell alkalmaznia az állapotalapú streamelési műveletekre, hogy elkerülje az állapotban tárolt adatok végtelen bővítését, ami memóriaproblémákat okozhat, és növelheti a feldolgozási késéseket a hosszan futó streamelési műveletek során.
Mi az a vízjel?
A strukturált streamelés vízjelekkel szabályozza a küszöbértéket, hogy mennyi ideig dolgozza fel a frissítéseket egy adott állapotentitáshoz. Az állapotentitások gyakori példái a következők:
- Összesítések egy időablakban.
- Egyedi kulcsok két stream közötti illesztésben.
Vízjel deklarálásakor egy időbélyegmezőt és egy vízjel küszöbértéket kell megadnia egy streamelési DataFrame-en. Az új adatok érkezésekor az állapotkezelő nyomon követi a megadott mező legutóbbi időbélyegét, és feldolgozza a késési küszöbértéken belüli összes rekordot.
Az alábbi példa egy 10 perces vízjel küszöbértéket alkalmaz egy ablakos számra:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Ebben a példában:
- Az
event_time
oszlop egy 10 perces vízjel és egy 5 perces csúszásablak meghatározására szolgál. - A rendszer minden megfigyelt, nem átfedésben lévő 5 perces ablakhoz
id
számokat gyűjt. - Az állapotinformációk az egyes darabszámokra vonatkozóan mindaddig megmaradnak, amíg az ablak vége 10 perccel régebbi, mint a legutóbbi megfigyelt
event_time
.
Fontos
A vízjel küszöbértékei garantálják, hogy a megadott küszöbértéken belül érkező rekordok feldolgozása a megadott lekérdezés szemantikájának megfelelően történik. Előfordulhat, hogy a megadott küszöbértéken kívülre érkező késői rekordok lekérdezési metrikákkal dolgozhatók fel, de ez nem garantált.
Hogyan befolyásolják a vízjelek a feldolgozási időt és az átviteli sebességet?
A vízjelek a kimeneti módokkal együttműködve szabályozják az adatok fogadóba írásakor. Mivel a vízjelek csökkentik a feldolgozandó állapotinformációk teljes mennyiségét, a vízjelek hatékony használata elengedhetetlen a hatékony állapotalapú streamelési átviteli sebességhez.
Feljegyzés
Nem minden kimeneti mód támogatott minden állapotalapú művelethez.
Vízjelek és kimeneti mód ablakos összesítésekhez
Az alábbi táblázat a vízjelet tartalmazó időbélyegen összesítő lekérdezések feldolgozását részletezi:
Kimeneti mód | Működés |
---|---|
Hozzáfűzés | A sorokat a rendszer a vízjel küszöbértékének lejárta után írja a céltáblába. A késési küszöbérték alapján minden írás késik. A régi összesítési állapot a küszöbérték túllépése után el lesz ejtve. |
Frissítés | A sorok az eredmények kiszámításakor a céltáblába vannak írva, és új adatok érkezésekor frissíthetők és felülírhatók. A régi összesítési állapot a küszöbérték túllépése után el lesz ejtve. |
Kész | Az összesítési állapot nincs elvetve. A céltábla minden eseményindítóval újra van írva. |
Vízjelek és kimenet stream-stream illesztésekhez
A több stream közötti illesztések csak a hozzáfűzési módot támogatják, és a megfeleltetett rekordok minden felderített kötegben meg vannak írva. Belső illesztések esetén a Databricks azt javasolja, hogy minden streamelési adatforráshoz állítson be vízjel küszöbértéket. Ez lehetővé teszi az állapotinformációk elvetét a régi rekordok esetében. Vízjelek nélkül a strukturált streamelés az illesztés mindkét oldaláról kísérli meg összekapcsolni az összes kulcsot az egyes triggerekkel.
A strukturált streamelés speciális szemantikával támogatja a külső illesztéseket. A külső illesztések esetében kötelező a vízjelezés, mivel ez azt jelzi, hogy mikor kell null értékkel írni egy kulcsot, miután az nem egyezik. Vegye figyelembe, hogy bár a külső illesztések hasznosak lehetnek olyan rekordok rögzítéséhez, amelyek nem egyeznek az adatfeldolgozás során, mivel az illesztések csak hozzáfűzési műveletként írnak a táblákba, a hiányzó adatok csak a késési küszöbérték lejárta után lesznek rögzítve.
Késői adatküszöb szabályozása több vízjelszabályzattal a strukturált streamelésben
Ha több strukturált streambemeneti bemenetet használ, több vízjelet is beállíthat a késői adatok tűréshatárainak szabályozásához. A vízjelek konfigurálása lehetővé teszi az állapotinformációk szabályozását, és hatással van a késésre.
A streamelési lekérdezések több bemeneti adatfolyamot is tartalmazhatnak, amelyek egyesítve vagy egyesítve vannak. Az egyes bemeneti adatfolyamok eltérő küszöbértéket tartalmazhatnak a késői adatokhoz, amelyeket az állapotalapú műveletekhez el kell viselni. Adja meg ezeket a küszöbértékeket az egyes bemeneti adatfolyamok használatával withWatermarks("eventTime", delay)
. Az alábbiakban egy példa lekérdezést láthat stream-stream illesztésekkel.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
A lekérdezés futtatása során a strukturált stream egyenként nyomon követi az egyes bemeneti streamekben látható maximális eseményidőt, kiszámítja a vízjeleket a megfelelő késleltetés alapján, és egyetlen globális vízjelet választ ki az állapotalapú műveletekhez. Alapértelmezés szerint a minimumot a rendszer globális vízjelként választja ki, mivel biztosítja, hogy véletlenül ne essen adat túl későre, ha az egyik stream a többi mögött marad (például az egyik stream leállítja az adatok fogadását a felsőbb rétegbeli hibák miatt). Más szóval a globális vízjel biztonságosan mozog a leglassabb stream ütemében, és a lekérdezés kimenete ennek megfelelően késik.
Ha gyorsabb eredményeket szeretne elérni, beállíthatja a több vízjeles szabályzatot, hogy globális vízjelként a maximális értéket válassza ki az SQL-konfiguráció spark.sql.streaming.multipleWatermarkPolicy
max
beállításával (alapértelmezés szerint).min
Ez lehetővé teszi, hogy a globális vízjel a leggyorsabb stream ütemében mozogjon. Ez a konfiguráció azonban elveti az adatokat a leglassabb streamekből. Emiatt a Databricks azt javasolja, hogy ezt a konfigurációt megfontoltan használja.
Duplikált elemek elvetése a vízjelen belül
A Databricks Runtime 13.3 LTS és újabb verziókban egyedi azonosító használatával deduplikálhatja a vízjel küszöbértékén belüli rekordokat.
A strukturált streamelés pontosan egyszeri feldolgozási garanciákat biztosít, de nem deduplikálja automatikusan a rekordokat az adatforrásokból. Bármely megadott mező rekordjainak deduplikálására használható dropDuplicatesWithinWatermark
, így akkor is eltávolíthatja a duplikált elemeket egy streamből, ha egyes mezők eltérnek (például az esemény ideje vagy az érkezési idő).
A megadott vízjelbe érkező ismétlődő rekordok garantáltan elvesznek. Ez a garancia csak egy irányban szigorú, és a megadott küszöbértéken kívülre érkező duplikált rekordok is elvethetők. Az összes ismétlődés eltávolításához meg kell adnia a vízjel késleltetési küszöbértékét a duplikált események közötti maximális időbélyeg-különbségeknél.
A metódus használatához dropDuplicatesWithinWatermark
meg kell adnia egy vízjelet, ahogyan az alábbi példában is látható:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])