Vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozásához

Ez a cikk bemutatja a vízjelezés alapvető fogalmait, és javaslatokat nyújt a vízjelek általános állapotalapú streamelési műveletekben való használatára. Vízjeleket kell alkalmaznia az állapotalapú streamelési műveletekre, hogy elkerülje az állapotban tárolt adatok végtelen bővítését, ami memóriaproblémákat okozhat, és növelheti a feldolgozási késéseket a hosszan futó streamelési műveletek során.

Mi az a vízjel?

A strukturált streamelés vízjelekkel szabályozza a küszöbértéket, hogy mennyi ideig dolgozza fel a frissítéseket egy adott állapotentitáshoz. Az állapotentitások gyakori példái a következők:

  • Összesítések egy időablakban.
  • Egyedi kulcsok két stream közötti illesztésben.

Vízjel deklarálásakor egy időbélyegmezőt és egy vízjel küszöbértéket kell megadnia egy streamelési DataFrame-en. Az új adatok érkezésekor az állapotkezelő nyomon követi a megadott mező legutóbbi időbélyegét, és feldolgozza a késési küszöbértéken belüli összes rekordot.

Az alábbi példa egy 10 perces vízjel küszöbértéket alkalmaz egy ablakos számra:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Ebben a példában:

  • Az event_time oszlop egy 10 perces vízjel és egy 5 perces csúszásablak meghatározására szolgál.
  • A rendszer minden megfigyelt, nem átfedésben lévő 5 perces ablakhoz id számokat gyűjt.
  • Az állapotinformációk az egyes darabszámokra vonatkozóan mindaddig megmaradnak, amíg az ablak vége 10 perccel régebbi, mint a legutóbbi megfigyelt event_time.

Fontos

A vízjel küszöbértékei garantálják, hogy a megadott küszöbértéken belül érkező rekordok feldolgozása a megadott lekérdezés szemantikájának megfelelően történik. Előfordulhat, hogy a megadott küszöbértéken kívülre érkező késői rekordok lekérdezési metrikákkal dolgozhatók fel, de ez nem garantált.

Hogyan befolyásolják a vízjelek a feldolgozási időt és az átviteli sebességet?

A vízjelek a kimeneti módokkal együttműködve szabályozják az adatok fogadóba írásakor. Mivel a vízjelek csökkentik a feldolgozandó állapotinformációk teljes mennyiségét, a vízjelek hatékony használata elengedhetetlen a hatékony állapotalapú streamelési átviteli sebességhez.

Feljegyzés

Nem minden kimeneti mód támogatott minden állapotalapú művelethez.

Vízjelek és kimeneti mód ablakos összesítésekhez

Az alábbi táblázat a vízjelet tartalmazó időbélyegen összesítő lekérdezések feldolgozását részletezi:

Kimeneti mód Működés
Hozzáfűzés A sorokat a rendszer a vízjel küszöbértékének lejárta után írja a céltáblába. A késési küszöbérték alapján minden írás késik. A régi összesítési állapot a küszöbérték túllépése után el lesz ejtve.
Frissítés A sorok az eredmények kiszámításakor a céltáblába vannak írva, és új adatok érkezésekor frissíthetők és felülírhatók. A régi összesítési állapot a küszöbérték túllépése után el lesz ejtve.
Kész Az összesítési állapot nincs elvetve. A céltábla minden eseményindítóval újra van írva.

Vízjelek és kimenet stream-stream illesztésekhez

A több stream közötti illesztések csak a hozzáfűzési módot támogatják, és a megfeleltetett rekordok minden felderített kötegben meg vannak írva. Belső illesztések esetén a Databricks azt javasolja, hogy minden streamelési adatforráshoz állítson be vízjel küszöbértéket. Ez lehetővé teszi az állapotinformációk elvetét a régi rekordok esetében. Vízjelek nélkül a strukturált streamelés az illesztés mindkét oldaláról kísérli meg összekapcsolni az összes kulcsot az egyes triggerekkel.

A strukturált streamelés speciális szemantikával támogatja a külső illesztéseket. A külső illesztések esetében kötelező a vízjelezés, mivel ez azt jelzi, hogy mikor kell null értékkel írni egy kulcsot, miután az nem egyezik. Vegye figyelembe, hogy bár a külső illesztések hasznosak lehetnek olyan rekordok rögzítéséhez, amelyek nem egyeznek az adatfeldolgozás során, mivel az illesztések csak hozzáfűzési műveletként írnak a táblákba, a hiányzó adatok csak a késési küszöbérték lejárta után lesznek rögzítve.

Késői adatküszöb szabályozása több vízjelszabályzattal a strukturált streamelésben

Ha több strukturált streambemeneti bemenetet használ, több vízjelet is beállíthat a késői adatok tűréshatárainak szabályozásához. A vízjelek konfigurálása lehetővé teszi az állapotinformációk szabályozását, és hatással van a késésre.

A streamelési lekérdezések több bemeneti adatfolyamot is tartalmazhatnak, amelyek egyesítve vagy egyesítve vannak. Az egyes bemeneti adatfolyamok eltérő küszöbértéket tartalmazhatnak a késői adatokhoz, amelyeket az állapotalapú műveletekhez el kell viselni. Adja meg ezeket a küszöbértékeket az egyes bemeneti adatfolyamok használatával withWatermarks("eventTime", delay) . Az alábbiakban egy példa lekérdezést láthat stream-stream illesztésekkel.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

A lekérdezés futtatása során a strukturált stream egyenként nyomon követi az egyes bemeneti streamekben látható maximális eseményidőt, kiszámítja a vízjeleket a megfelelő késleltetés alapján, és egyetlen globális vízjelet választ ki az állapotalapú műveletekhez. Alapértelmezés szerint a minimumot a rendszer globális vízjelként választja ki, mivel biztosítja, hogy véletlenül ne essen adat túl későre, ha az egyik stream a többi mögött marad (például az egyik stream leállítja az adatok fogadását a felsőbb rétegbeli hibák miatt). Más szóval a globális vízjel biztonságosan mozog a leglassabb stream ütemében, és a lekérdezés kimenete ennek megfelelően késik.

Ha gyorsabb eredményeket szeretne elérni, beállíthatja a több vízjeles szabályzatot, hogy globális vízjelként a maximális értéket válassza ki az SQL-konfiguráció spark.sql.streaming.multipleWatermarkPolicymax beállításával (alapértelmezés szerint).min Ez lehetővé teszi, hogy a globális vízjel a leggyorsabb stream ütemében mozogjon. Ez a konfiguráció azonban elveti az adatokat a leglassabb streamekből. Emiatt a Databricks azt javasolja, hogy ezt a konfigurációt megfontoltan használja.

Duplikált elemek elvetése a vízjelen belül

A Databricks Runtime 13.3 LTS és újabb verziókban egyedi azonosító használatával deduplikálhatja a vízjel küszöbértékén belüli rekordokat.

A strukturált streamelés pontosan egyszeri feldolgozási garanciákat biztosít, de nem deduplikálja automatikusan a rekordokat az adatforrásokból. Bármely megadott mező rekordjainak deduplikálására használható dropDuplicatesWithinWatermark , így akkor is eltávolíthatja a duplikált elemeket egy streamből, ha egyes mezők eltérnek (például az esemény ideje vagy az érkezési idő).

A megadott vízjelbe érkező ismétlődő rekordok garantáltan elvesznek. Ez a garancia csak egy irányban szigorú, és a megadott küszöbértéken kívülre érkező duplikált rekordok is elvethetők. Az összes ismétlődés eltávolításához meg kell adnia a vízjel késleltetési küszöbértékét a duplikált események közötti maximális időbélyeg-különbségeknél.

A metódus használatához dropDuplicatesWithinWatermark meg kell adnia egy vízjelet, ahogyan az alábbi példában is látható:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")