Oktatóanyag: Valós idejű streamelési számítási feladat futtatása

A valós idejű mód lehetővé teszi az ultra-alacsony késleltetésű streamelést, amely akár öt ezredmásodpercnyi végpontok közötti késést is biztosít, így ideális az olyan üzemeltetési számítási feladatokhoz, mint a csalások észlelése és a valós idejű személyre szabás. Ez az oktatóanyag végigvezeti az első valós idejű streamelési lekérdezés egyszerű példán keresztüli beállításán.

A valós idejű módról, a használatuk időpontjáról és a támogatott funkciókról a strukturált streamelés valós idejű módjáról olvashat. A konfigurációs követelményekről a valós idejű mód beállítása című témakörben olvashat.

Követelmények

Mielőtt hozzákezdene, győződjön meg arról, hogy rendelkezik engedéllyel egy olyan klasszikus számítási fürt létrehozásához, amely a Valós idejű beállítás módban megadott konfigurációt használja. Másik lehetőségként lépjen kapcsolatba a munkaterület rendszergazdájával, hogy ő hozzon létre egy valós idejű fürtöt önnek.

1. lépés: Jegyzetfüzet létrehozása

A jegyzetfüzetek interaktív környezetet biztosítanak a streamelési lekérdezések fejlesztéséhez és teszteléséhez. Ezzel a jegyzetfüzetel megírhatja a valós idejű lekérdezést, és folyamatosan láthatja az eredmények frissítését.

Jegyzetfüzet létrehozása:

  1. Kattintson az Oldalsáv Új gombjára, majd a Jegyzetfüzet ikonra.Jegyzetfüzet.
  2. A számítási legördülő menüben válassza ki a valós idejű módú fürtöt.
  3. Alapértelmezett nyelvként válassza a Python vagy Scala lehetőséget.

2. lépés: Valós idejű módú lekérdezés futtatása

Másolja és illessze be a következő kódot egy jegyzetfüzetcellába, és futtassa. Ez a példa egy sebességforrást használ, amely meghatározott sebességgel hoz létre sorokat, és valós időben jeleníti meg az eredményeket.

Megjegyzés:

Az display eseményindítóval rendelkező realTime függvény a Databricks Runtime 17.1-ben és újabb verziókban érhető el.

Python

inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Scala

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())

A kód futtatása után megjelenik egy tábla, amely valós időben frissül az új sorok létrehozásakor. A táblázat egy oszlopot timestamp és egy oszlopot value jelenít meg, amely az egyes sorokkal együtt növekszik.

A kód értelmezése

A fenti kód egy valós idejű streamelési lekérdezés alapvető összetevőit mutatja be. A következő táblázatok ismertetik a fő paramétereket és az általuk szabályozható tényezőket:

Python

Paraméter Leírás
format("rate") A sebességforrást használja, egy beépített forrást, amely konfigurálható sebességgel hoz létre sorokat. Ez hasznos külső függőségek nélküli teszteléshez.
numPartitions Beállítja a létrehozott adatok partícióinak számát.
rowsPerSecond Meghatározza, hogy másodpercenként hány sor jön létre.
realTime="5 minutes" Lehetővé teszi a valós idejű módot. Az intervallum azt határozza meg, hogy a lekérdezési ellenőrzőpontok milyen gyakran haladnak. A hosszabb időközök ritkábban ellenőrzik az ellenőrzőpontokat, de a hibák után hosszabb helyreállítási időt is jelenthetnek.
outputMode="update" A valós idejű mód frissítési kimeneti módot igényel.

Scala

Paraméter Leírás
format("rate") A sebességforrást használja, egy beépített forrást, amely konfigurálható sebességgel hoz létre sorokat. Ez hasznos külső függőségek nélküli teszteléshez.
numPartitions Beállítja a létrehozott adatok partícióinak számát.
rowsPerSecond Meghatározza, hogy másodpercenként hány sor jön létre.
Trigger.RealTime() Engedélyezi a valós idejű módot az alapértelmezett ellenőrzőpont-intervallummal. Megadhat például Trigger.RealTime("5 minutes")egy intervallumot is.
OutputMode.Update() A valós idejű mód frissítési kimeneti módot igényel.

3. lépés: Eredmények ellenőrzése

A lekérdezés futtatásakor a display függvény létrehoz egy táblát, amely valós időben frissül, amikor a sebességforrás új sorokat hoz létre. Minden sor a következőket tartalmazza:

  • Időbélyeg arra az időre, amikor a sort a rátaforrás hozta létre.
  • Egy monoton módon növekvő számláló, amely minden új sornál növekszik.

A tábla minimális késéssel folyamatosan frissül, és bemutatja, hogy a valós idejű mód hogyan dolgozza fel az adatokat, amint elérhetővé válik. Ez a valós idejű mód alapvető előnye – az adatok azonnali megtekintésének és kezelésének képessége a kötegelt feldolgozásra való várakozás helyett.

További erőforrások

Most, hogy futtatta az első valós idejű lekérdezést, fedezze fel ezeket az erőforrásokat éles streamelési alkalmazások létrehozásához a Kafka, a Kinesis és más támogatott források használatával: