Megosztás a következőn keresztül:


A foreachBatch használata tetszőleges adatgyűjtőkbe való íráshoz

Ez a cikk a strukturált streamelés használatával foreachBatch ismerteti a streamlekérdezés kimenetének írását olyan adatforrásokhoz, amelyek nem rendelkeznek meglévő streamelt fogadóval.

A kódminta streamingDF.writeStream.foreachBatch(...) lehetővé teszi kötegelt függvények alkalmazását a streamelési lekérdezés minden mikro kötegének kimeneti adataira. A függvények két paraméterrel foreachBatch használhatók:

  • Olyan DataFrame, amely egy mikro köteg kimeneti adatait tartalmazza.
  • A mikro köteg egyedi azonosítója.

A Delta Lake-egyesítési műveleteket a strukturált streamelésben kell használnia foreachBatch . Lásd: Upsert from streaming lekérdezések foreachBatch használatával.

További DataFrame-műveletek alkalmazása

Számos DataFrame- és adathalmaz-művelet nem támogatott a streamelési DataFrame-ekben, mert a Spark ezekben az esetekben nem támogatja a növekményes tervek generálásának támogatását. Ezen foreachBatch() műveletek némelyikét alkalmazhatja az egyes mikroköteg-kimenetekre. Használhatja például foreachBath() az SQL-műveletet MERGE INTO a streamelési aggregációk kimenetének frissítési módban egy Delta-táblába való írásához. További részletekért tekintse meg a MERGE INTO című témakört.

Fontos

  • foreachBatch() csak legalább egyszer írható garanciát biztosít. A függvényhez megadott módon azonban deduplikálhatja batchId a kimenetet, és pontosan egyszeri garanciát kaphat. Mindkét esetben meg kell indokolnia a végpontok közötti szemantikát.
  • foreachBatch() nem működik a folyamatos feldolgozási móddal , mivel alapvetően egy streamelési lekérdezés mikroköteges végrehajtására támaszkodik. Ha folyamatos módban ír adatokat, használja foreach() helyette.

Egy üres adatkeret meghívható, foreachBatch() és a felhasználói kódnak rugalmasnak kell lennie a megfelelő működéshez. Íme egy példa:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

A Databricks Runtime 14.0 viselkedési változásai foreachBatch

A Databricks Runtime 14.0-s és újabb verziója a megosztott hozzáférési móddal forEachBatch konfigurált számításon külön elkülönített Python-folyamatban fut az Apache Sparkon a REPL-környezet helyett. Szerializálja és leküldi a Sparkba, és a munkamenet időtartama alatt nem fér hozzá a globális spark objektumokhoz.

Az összes többi számítási konfigurációban ugyanabban a Python REPL-ben fut, foreachBatch amely a kód többi részét futtatja. Ennek eredményeképpen a függvény nincs szerializálva.

Ha a Databricks Runtime 14.0-s vagy újabb verzióját használja a megosztott hozzáférési móddal konfigurált számításon, a helyi DataFrame-hez hatókörrel rendelkező változót kell használnia sparkSession a Pythonban való használat foreachBatch során, ahogyan az alábbi kód példában is látható:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

A következő viselkedésváltozások érvényesek:

  • A függvényen belülről nem férhet hozzá globális Python-változókhoz.
  • print() parancsok írnak kimenetet az illesztőprogram-naplókba.
  • A függvényben hivatkozott fájloknak, moduloknak és objektumoknak szerializálhatónak kell lenniük, és elérhetőnek kell lenniük a Sparkban.

Meglévő kötegelt adatforrások újrafelhasználása

A használatával foreachBatch()meglévő kötegelt adatírókat használhat olyan adatgyűjtőkhöz, amelyek nem rendelkeznek strukturált streamelési támogatással. Íme néhány példa:

Számos más kötegelt adatforrás is használható a forrásból foreachBatch(). Lásd az adatforrások Csatlakozás.

Írás több helyre

Ha egy streamelési lekérdezés kimenetét több helyre kell írnia, a Databricks több strukturált streamíró használatát javasolja a legjobb párhuzamosság és átviteli sebesség érdekében.

Ha foreachBatch több fogadóba ír, szerializálja a streamelési írások végrehajtását, ami növelheti az egyes mikrokötegek késését.

Ha foreachBatch több Delta-táblába is ír, tekintse meg az Idempotent tábla írásait a ForeachBatchben.