Megosztás a következőn keresztül:


A foreachBatch használata tetszőleges adatgyűjtőkbe való íráshoz

Ez a cikk a strukturált streamelés használatával foreachBatch ismerteti a streamlekérdezés kimenetének írását olyan adatforrásokhoz, amelyek nem rendelkeznek meglévő streamelt fogadóval.

A kódminta streamingDF.writeStream.foreachBatch(...) lehetővé teszi kötegelt függvények alkalmazását a streamelési lekérdezés minden mikro kötegének kimeneti adataira. A függvények két paraméterrel foreachBatch használhatók:

  • Olyan DataFrame, amely egy mikro köteg kimeneti adatait tartalmazza.
  • A mikro köteg egyedi azonosítója.

A Delta Lake-egyesítési műveleteket a strukturált streamelésben kell használnia foreachBatch . Lásd: Upsert from streaming lekérdezések foreachBatch használatával.

További DataFrame-műveletek alkalmazása

Számos DataFrame- és adathalmaz-művelet nem támogatott a streamelési DataFrame-ekben, mert a Spark ezekben az esetekben nem támogatja a növekményes tervek generálásának támogatását. Ezen foreachBatch() műveletek némelyikét alkalmazhatja az egyes mikroköteg-kimenetekre. Használhatja például foreachBath() az SQL-műveletet MERGE INTO a streamelési aggregációk kimenetének frissítési módban egy Delta-táblába való írásához. További részletekért tekintse meg a MERGE INTO című témakört.

Fontos

  • foreachBatch() csak legalább egyszer írható garanciát biztosít. A függvényhez megadott módon azonban deduplikálhatja batchId a kimenetet, és pontosan egyszeri garanciát kaphat. Mindkét esetben meg kell indokolnia a végpontok közötti szemantikát.
  • foreachBatch() nem működik a folyamatos feldolgozási móddal , mivel alapvetően egy streamelési lekérdezés mikroköteges végrehajtására támaszkodik. Ha folyamatos módban ír adatokat, használja foreach() helyette.

Egy üres adatkeret meghívható, foreachBatch() és a felhasználói kódnak rugalmasnak kell lennie a megfelelő működéshez. Íme egy példa:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

A Databricks Runtime 14.0 viselkedési változásai foreachBatch

A Databricks Runtime 14.0-s és újabb verziókban a megosztott hozzáférési móddal konfigurált számításon a következő viselkedésváltozások érvényesek:

  • print() parancsok írnak kimenetet az illesztőprogram-naplókba.
  • A függvényen belüli almodul nem érhető el dbutils.widgets .
  • A függvényben hivatkozott fájloknak, moduloknak és objektumoknak szerializálhatónak kell lenniük, és elérhetőnek kell lenniük a Sparkban.

Meglévő kötegelt adatforrások újrafelhasználása

A használatával foreachBatch()meglévő kötegelt adatírókat használhat olyan adatgyűjtőkhöz, amelyek nem rendelkeznek strukturált streamelési támogatással. Íme néhány példa:

Számos más kötegelt adatforrás is használható a forrásból foreachBatch(). Lásd: Csatlakozás adatforrásokhoz.

Írás több helyre

Ha egy streamelési lekérdezés kimenetét több helyre kell írnia, a Databricks több strukturált streamíró használatát javasolja a legjobb párhuzamosság és átviteli sebesség érdekében.

Ha foreachBatch több fogadóba ír, szerializálja a streamelési írások végrehajtását, ami növelheti az egyes mikrokötegek késését.

Ha foreachBatch több Delta-táblába is ír, tekintse meg az Idempotent tábla írásait a ForeachBatchben.