Dela via


Prenumerera på Google Pub/Sub

Azure Databricks tillhandahåller en inbyggd anslutningsapp för att prenumerera på Google Pub/Sub i Databricks Runtime 13.3 LTS och senare. Den här anslutningsappen tillhandahåller exakt en gång bearbetningssemantik för poster från prenumeranten.

Kommentar

Pub/sub kan publicera dubblettposter och poster kan komma till prenumeranten i ur ordning. Du bör skriva Azure Databricks-kod för att hantera dubbletter och out-of-order-poster.

Syntaxexempel

I följande kodexempel visas den grundläggande syntaxen för att konfigurera en strukturerad direktuppspelning från Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Fler konfigurationsalternativ finns i Konfigurera alternativ för pub-/underströmningsläsning.

Konfigurera åtkomst till Pub/Sub

Databricks rekommenderar att du använder hemligheter när du tillhandahåller auktoriseringsalternativ. Följande alternativ krävs för att auktorisera en anslutning:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

I följande tabell beskrivs de roller som krävs för de konfigurerade autentiseringsuppgifterna:

Roller Obligatorisk eller valfri Hur den används
roles/pubsub.viewer eller roles/viewer Obligatoriskt Kontrollera om prenumerationen finns och hämta en prenumeration
roles/pubsub.subscriber Obligatoriskt Hämta data från en prenumeration
roles/pubsub.editor eller roles/editor Valfritt Gör det möjligt att skapa en prenumeration om det inte finns en prenumeration och gör det också möjligt att ta deleteSubscriptionOnStreamStop bort prenumerationer vid stream-avslutning

Pub-/underschema

Schemat för strömmen matchar de poster som hämtas från Pub/Sub enligt beskrivningen i följande tabell:

Fält Typ
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Konfigurera alternativ för pub-/underströmningsläsning

I följande tabell beskrivs de alternativ som stöds för Pub/Sub. Alla alternativ konfigureras som en del av en strukturerad strömningsläsning med hjälp av .option("<optionName>", "<optionValue>") syntax.

Kommentar

Vissa alternativ för pub-/underkonfiguration använder begreppet hämtningar i stället för mikrobatch. Detta återspeglar intern implementeringsinformation och alternativen fungerar på samma sätt som registreringar i andra anslutningsappar för strukturerad direktuppspelning, förutom att poster hämtas och sedan bearbetas.

Alternativ Standardvärde beskrivning
numFetchPartitions Ange till hälften av antalet utförare som finns vid initieringen av dataströmmen. Antalet parallella Spark-uppgifter som hämtar poster från en prenumeration.
deleteSubscriptionOnStreamStop false Om truetas prenumerationen som skickas till dataströmmen bort när strömningsjobbet slutar.
maxBytesPerTrigger inget En mjuk gräns för batchstorleken som ska bearbetas under varje utlöst mikrobatch.
maxRecordsPerFetch 1000 Antalet poster som ska hämtas per aktivitet innan poster bearbetas.
maxFetchPeriod 10 sekund Tidsåtgången för varje aktivitet som ska hämtas innan poster bearbetas. Databricks rekommenderar att du använder standardvärdet.

Inkrementell batchbearbetningssemantik för Pub/Sub

Du kan använda Trigger.AvailableNow för att använda tillgängliga poster från pub-/underkällorna en inkrementell batch.

Azure Databricks registrerar tidsstämpeln när du börjar läsa med inställningen Trigger.AvailableNow . Poster som bearbetas av batchen innehåller alla tidigare hämtade data och eventuella nyligen publicerade poster med en tidsstämpel som är mindre än den inspelade starttidsstämpeln för dataströmmen.

Se Konfigurera inkrementell batchbearbetning.

Övervaka strömningsmått

Förloppsmått för strukturerad direktuppspelning rapporterar antalet poster som hämtats och är redo att bearbetas, storleken på de poster som hämtats och är redo att bearbetas samt antalet dubbletter som setts sedan strömmen startade. Följande är ett exempel på dessa mått:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Begränsningar

Spekulativ körning (spark.speculation) stöds inte med Pub/Sub.