Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
A valós idejű mód lehetővé teszi az ultra-alacsony késleltetésű streamelést, amely akár öt ezredmásodpercnyi végpontok közötti késést is biztosít, így ideális az olyan üzemeltetési számítási feladatokhoz, mint a csalások észlelése és a valós idejű személyre szabás. Ez az oktatóanyag végigvezeti az első valós idejű streamelési lekérdezés egyszerű példán keresztüli beállításán.
A valós idejű módról, a használatuk időpontjáról és a támogatott funkciókról a strukturált streamelés valós idejű módjáról olvashat. A konfigurációs követelményekről a valós idejű mód beállítása című témakörben olvashat.
Követelmények
Mielőtt hozzákezdene, győződjön meg arról, hogy rendelkezik engedéllyel egy olyan klasszikus számítási fürt létrehozásához, amely a Valós idejű beállítás módban megadott konfigurációt használja. Másik lehetőségként lépjen kapcsolatba a munkaterület rendszergazdájával, hogy ő hozzon létre egy valós idejű fürtöt önnek.
1. lépés: Jegyzetfüzet létrehozása
A jegyzetfüzetek interaktív környezetet biztosítanak a streamelési lekérdezések fejlesztéséhez és teszteléséhez. Ezzel a jegyzetfüzetel megírhatja a valós idejű lekérdezést, és folyamatosan láthatja az eredmények frissítését.
Jegyzetfüzet létrehozása:
- Kattintson az Oldalsáv Új gombjára, majd a
Jegyzetfüzet.
- A számítási legördülő menüben válassza ki a valós idejű módú fürtöt.
- Alapértelmezett nyelvként válassza a Python vagy Scala lehetőséget.
2. lépés: Valós idejű módú lekérdezés futtatása
Másolja és illessze be a következő kódot egy jegyzetfüzetcellába, és futtassa. Ez a példa egy sebességforrást használ, amely meghatározott sebességgel hoz létre sorokat, és valós időben jeleníti meg az eredményeket.
Megjegyzés:
Az display eseményindítóval rendelkező realTime függvény a Databricks Runtime 17.1-ben és újabb verziókban érhető el.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
A kód futtatása után megjelenik egy tábla, amely valós időben frissül az új sorok létrehozásakor. A táblázat egy oszlopot timestamp és egy oszlopot value jelenít meg, amely az egyes sorokkal együtt növekszik.
A kód értelmezése
A fenti kód egy valós idejű streamelési lekérdezés alapvető összetevőit mutatja be. A következő táblázatok ismertetik a fő paramétereket és az általuk szabályozható tényezőket:
Python
| Paraméter | Leírás |
|---|---|
format("rate") |
A sebességforrást használja, egy beépített forrást, amely konfigurálható sebességgel hoz létre sorokat. Ez hasznos külső függőségek nélküli teszteléshez. |
numPartitions |
Beállítja a létrehozott adatok partícióinak számát. |
rowsPerSecond |
Meghatározza, hogy másodpercenként hány sor jön létre. |
realTime="5 minutes" |
Lehetővé teszi a valós idejű módot. Az intervallum azt határozza meg, hogy a lekérdezési ellenőrzőpontok milyen gyakran haladnak. A hosszabb időközök ritkábban ellenőrzik az ellenőrzőpontokat, de a hibák után hosszabb helyreállítási időt is jelenthetnek. |
outputMode="update" |
A valós idejű mód frissítési kimeneti módot igényel. |
Scala
| Paraméter | Leírás |
|---|---|
format("rate") |
A sebességforrást használja, egy beépített forrást, amely konfigurálható sebességgel hoz létre sorokat. Ez hasznos külső függőségek nélküli teszteléshez. |
numPartitions |
Beállítja a létrehozott adatok partícióinak számát. |
rowsPerSecond |
Meghatározza, hogy másodpercenként hány sor jön létre. |
Trigger.RealTime() |
Engedélyezi a valós idejű módot az alapértelmezett ellenőrzőpont-intervallummal. Megadhat például Trigger.RealTime("5 minutes")egy intervallumot is. |
OutputMode.Update() |
A valós idejű mód frissítési kimeneti módot igényel. |
3. lépés: Eredmények ellenőrzése
A lekérdezés futtatásakor a display függvény létrehoz egy táblát, amely valós időben frissül, amikor a sebességforrás új sorokat hoz létre. Minden sor a következőket tartalmazza:
- Időbélyeg arra az időre, amikor a sort a rátaforrás hozta létre.
- Egy monoton módon növekvő számláló, amely minden új sornál növekszik.
A tábla minimális késéssel folyamatosan frissül, és bemutatja, hogy a valós idejű mód hogyan dolgozza fel az adatokat, amint elérhetővé válik. Ez a valós idejű mód alapvető előnye – az adatok azonnali megtekintésének és kezelésének képessége a kötegelt feldolgozásra való várakozás helyett.
További erőforrások
Most, hogy futtatta az első valós idejű lekérdezést, fedezze fel ezeket az erőforrásokat éles streamelési alkalmazások létrehozásához a Kafka, a Kinesis és más támogatott források használatával: