Megosztás a következőn keresztül:


Feliratkozás a Google Pub/Sub szolgáltatásra

Az Azure Databricks egy beépített összekötőt biztosít a Google Pub/Sub előfizetéséhez a Databricks Runtime 13.3 LTS-ben és újabb verziókban. Ez az összekötő pontosan egyszeri feldolgozási szemantikát biztosít az előfizető rekordjaihoz.

Feljegyzés

A Pub/Sub ismétlődő rekordokat tehet közzé, és előfordulhat, hogy a rekordok sorrenden kívül érkeznek az előfizetőhöz. Meg kell írnia az Azure Databricks-kódot a duplikált és a rendelésen kívüli rekordok kezeléséhez.

Példa szintaxisra

Az alábbi kódpéldából megismerheti a Strukturált streamelési olvasás konfigurálásának alapszintaxisát a Pub/Sub szolgáltatásból:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

További konfigurációs beállításokért tekintse meg a Pub/Sub streaming olvasási beállításainak konfigurálását.

Pubhoz/alhoz való hozzáférés konfigurálása

A Databricks a titkos kódok használatát javasolja az engedélyezési lehetőségek megadásakor. A kapcsolat engedélyezéséhez a következő lehetőségek szükségesek:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Az alábbi táblázat a konfigurált hitelesítő adatokhoz szükséges szerepköröket ismerteti:

Szerepkörök Kötelező vagy választható A használat menete
roles/pubsub.viewer vagy roles/viewer Kötelező Ellenőrizze, hogy létezik-e előfizetés, és szerezze be az előfizetést
roles/pubsub.subscriber Kötelező Adatok lekérése előfizetésből
roles/pubsub.editor vagy roles/editor Választható Engedélyezi az előfizetés létrehozását, ha az nem létezik, és lehetővé teszi az deleteSubscriptionOnStreamStop előfizetések törlését a stream leállításakor

Pub/Alséma

A stream sémája megegyezik a Pub/Sub szolgáltatásból lekért rekordokkal, az alábbi táblázatban leírtak szerint:

Mező Típus
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

A Pub/Sub streaming olvasási beállításainak konfigurálása

Az alábbi táblázat a Pub/Sub esetében támogatott beállításokat ismerteti. Minden beállítás konfigurálva van egy strukturált streamelési olvasás részeként szintaxis használatával .option("<optionName>", "<optionValue>") .

Feljegyzés

Egyes pub-/alkonfigurációs beállítások a lekérések fogalmát használják a mikrokötegek helyett. Ez a belső megvalósítás részleteit tükrözi, és a beállítások a többi strukturált stream-összekötőben lévő társrollokhoz hasonlóan működnek, kivéve, hogy a rekordok lekérése és feldolgozása történik.

Lehetőség Alapértelmezett érték Leírás
numFetchPartitions Állítsa be a stream inicializálása során jelen lévő végrehajtók számának felét. Az előfizetésből rekordokat lekérő párhuzamos Spark-feladatok száma.
deleteSubscriptionOnStreamStop false Ha truea streamnek átadott előfizetés a streamelési feladat befejeződésekor törlődik.
maxBytesPerTrigger Nincs Az egyes aktivált mikrokötegek során feldolgozandó köteg méretének korlátja.
maxRecordsPerFetch 1000 A rekordok feldolgozása előtt lekérendő rekordok száma tevékenységenként.
maxFetchPeriod 10 másodperc Az egyes tevékenységek beolvasásának időtartama a rekordok feldolgozása előtt. A Databricks az alapértelmezett érték használatát javasolja.

Növekményes kötegfeldolgozás szemantikája Pub/Sub esetén

Egy növekményes köteget használhat Trigger.AvailableNow a Pub/Alforrások elérhető rekordjainak felhasználásához.

Az Azure Databricks rögzíti az időbélyeget, amikor elkezd olvasni a Trigger.AvailableNow beállítással. A köteg által feldolgozott rekordok tartalmazzák az összes korábban lekért adatot és az újonnan közzétett rekordokat, és a rögzített stream kezdési időbélyegénél kisebb időbélyeggel.

Lásd a növekményes kötegfeldolgozás konfigurálását.

Streamelési metrikák monitorozása

A strukturált streamelési folyamat mérőszámai a lehívott és a feldolgozásra kész rekordok számát, a lekért és a feldolgozásra kész rekordok méretét, valamint a stream kezdete óta látott ismétlődések számát jelentik. Az alábbi példák a következő metrikákra mutatnak be példát:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Korlátozások

A spekulatív végrehajtás (spark.speculation) nem támogatott a Pub/Sub esetében.