A foreachBatch használata tetszőleges adatgyűjtőkbe való íráshoz
Ez a cikk a strukturált streamelés használatával foreachBatch
ismerteti a streamlekérdezés kimenetének írását olyan adatforrásokhoz, amelyek nem rendelkeznek meglévő streamelt fogadóval.
A kódminta streamingDF.writeStream.foreachBatch(...)
lehetővé teszi kötegelt függvények alkalmazását a streamelési lekérdezés minden mikro kötegének kimeneti adataira. A függvények két paraméterrel foreachBatch
használhatók:
- Olyan DataFrame, amely egy mikro köteg kimeneti adatait tartalmazza.
- A mikro köteg egyedi azonosítója.
A Delta Lake-egyesítési műveleteket a strukturált streamelésben kell használnia foreachBatch
. Lásd: Upsert from streaming lekérdezések foreachBatch használatával.
További DataFrame-műveletek alkalmazása
Számos DataFrame- és adathalmaz-művelet nem támogatott a streamelési DataFrame-ekben, mert a Spark ezekben az esetekben nem támogatja a növekményes tervek generálásának támogatását. Ezen foreachBatch()
műveletek némelyikét alkalmazhatja az egyes mikroköteg-kimenetekre. Használhatja például foreachBath()
az SQL-műveletet MERGE INTO
a streamelési aggregációk kimenetének frissítési módban egy Delta-táblába való írásához. További részletekért tekintse meg a MERGE INTO című témakört.
Fontos
foreachBatch()
csak legalább egyszer írható garanciát biztosít. A függvényhez megadott módon azonban deduplikálhatjabatchId
a kimenetet, és pontosan egyszeri garanciát kaphat. Mindkét esetben meg kell indokolnia a végpontok közötti szemantikát.foreachBatch()
nem működik a folyamatos feldolgozási móddal , mivel alapvetően egy streamelési lekérdezés mikroköteges végrehajtására támaszkodik. Ha folyamatos módban ír adatokat, használjaforeach()
helyette.
Egy üres adatkeret meghívható, foreachBatch()
és a felhasználói kódnak rugalmasnak kell lennie a megfelelő működéshez. Íme egy példa:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
A Databricks Runtime 14.0 viselkedési változásai foreachBatch
A Databricks Runtime 14.0-s és újabb verziókban a megosztott hozzáférési móddal konfigurált számításon a következő viselkedésváltozások érvényesek:
print()
parancsok írnak kimenetet az illesztőprogram-naplókba.- A függvényen belüli almodul nem érhető el
dbutils.widgets
. - A függvényben hivatkozott fájloknak, moduloknak és objektumoknak szerializálhatónak kell lenniük, és elérhetőnek kell lenniük a Sparkban.
Meglévő kötegelt adatforrások újrafelhasználása
A használatával foreachBatch()
meglévő kötegelt adatírókat használhat olyan adatgyűjtőkhöz, amelyek nem rendelkeznek strukturált streamelési támogatással. Íme néhány példa:
Számos más kötegelt adatforrás is használható a forrásból foreachBatch()
. Lásd: Csatlakozás adatforrásokhoz.
Írás több helyre
Ha egy streamelési lekérdezés kimenetét több helyre kell írnia, a Databricks több strukturált streamíró használatát javasolja a legjobb párhuzamosság és átviteli sebesség érdekében.
Ha foreachBatch
több fogadóba ír, szerializálja a streamelési írások végrehajtását, ami növelheti az egyes mikrokötegek késését.
Ha foreachBatch
több Delta-táblába is ír, tekintse meg az Idempotent tábla írásait a ForeachBatchben.