A foreachBatch használata tetszőleges adatgyűjtőkbe való íráshoz
Ez a cikk a strukturált streamelés használatával foreachBatch
ismerteti a streamlekérdezés kimenetének írását olyan adatforrásokhoz, amelyek nem rendelkeznek meglévő streamelt fogadóval.
A kódminta streamingDF.writeStream.foreachBatch(...)
lehetővé teszi kötegelt függvények alkalmazását a streamelési lekérdezés minden mikro kötegének kimeneti adataira. A függvények két paraméterrel foreachBatch
használhatók:
- Olyan DataFrame, amely egy mikro köteg kimeneti adatait tartalmazza.
- A mikro köteg egyedi azonosítója.
A Delta Lake-egyesítési műveleteket a strukturált streamelésben kell használnia foreachBatch
. Lásd: Upsert from streaming lekérdezések foreachBatch használatával.
További DataFrame-műveletek alkalmazása
Számos DataFrame- és adathalmaz-művelet nem támogatott a streamelési DataFrame-ekben, mert a Spark ezekben az esetekben nem támogatja a növekményes tervek generálásának támogatását. Ezen foreachBatch()
műveletek némelyikét alkalmazhatja az egyes mikroköteg-kimenetekre. Használhatja például foreachBath()
az SQL-műveletet MERGE INTO
a streamelési aggregációk kimenetének frissítési módban egy Delta-táblába való írásához. További részletekért tekintse meg a MERGE INTO című témakört.
Fontos
foreachBatch()
csak legalább egyszer írható garanciát biztosít. A függvényhez megadott módon azonban deduplikálhatjabatchId
a kimenetet, és pontosan egyszeri garanciát kaphat. Mindkét esetben meg kell indokolnia a végpontok közötti szemantikát.foreachBatch()
nem működik a folyamatos feldolgozási móddal , mivel alapvetően egy streamelési lekérdezés mikroköteges végrehajtására támaszkodik. Ha folyamatos módban ír adatokat, használjaforeach()
helyette.
Egy üres adatkeret meghívható, foreachBatch()
és a felhasználói kódnak rugalmasnak kell lennie a megfelelő működéshez. Íme egy példa:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
A Databricks Runtime 14.0 viselkedési változásai foreachBatch
A Databricks Runtime 14.0-s és újabb verziója a megosztott hozzáférési móddal forEachBatch
konfigurált számításon külön elkülönített Python-folyamatban fut az Apache Sparkon a REPL-környezet helyett. Szerializálja és leküldi a Sparkba, és a munkamenet időtartama alatt nem fér hozzá a globális spark
objektumokhoz.
Az összes többi számítási konfigurációban ugyanabban a Python REPL-ben fut, foreachBatch
amely a kód többi részét futtatja. Ennek eredményeképpen a függvény nincs szerializálva.
Ha a Databricks Runtime 14.0-s vagy újabb verzióját használja a megosztott hozzáférési móddal konfigurált számításon, a helyi DataFrame-hez hatókörrel rendelkező változót kell használnia sparkSession
a Pythonban való használat foreachBatch
során, ahogyan az alábbi kód példában is látható:
def example_function(df, batch_id):
df.sparkSession.sql("<query>")
A következő viselkedésváltozások érvényesek:
- A függvényen belülről nem férhet hozzá globális Python-változókhoz.
print()
parancsok írnak kimenetet az illesztőprogram-naplókba.- A függvényben hivatkozott fájloknak, moduloknak és objektumoknak szerializálhatónak kell lenniük, és elérhetőnek kell lenniük a Sparkban.
Meglévő kötegelt adatforrások újrafelhasználása
A használatával foreachBatch()
meglévő kötegelt adatírókat használhat olyan adatgyűjtőkhöz, amelyek nem rendelkeznek strukturált streamelési támogatással. Íme néhány példa:
Számos más kötegelt adatforrás is használható a forrásból foreachBatch()
. Lásd az adatforrások Csatlakozás.
Írás több helyre
Ha egy streamelési lekérdezés kimenetét több helyre kell írnia, a Databricks több strukturált streamíró használatát javasolja a legjobb párhuzamosság és átviteli sebesség érdekében.
Ha foreachBatch
több fogadóba ír, szerializálja a streamelési írások végrehajtását, ami növelheti az egyes mikrokötegek késését.
Ha foreachBatch
több Delta-táblába is ír, tekintse meg az Idempotent tábla írásait a ForeachBatchben.
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: