Share via


Abonneren op Google Pub/Sub

Azure Databricks biedt een ingebouwde connector om u te abonneren op Google Pub/Sub in Databricks Runtime 13.3 LTS en hoger. Deze connector biedt exact eenmaal verwerkingssemantiek voor records van de abonnee.

Notitie

Pub/Sub publiceert mogelijk dubbele records en records komen mogelijk niet in orde aan de abonnee. U moet Azure Databricks-code schrijven om dubbele en niet-orderrecords te verwerken.

Voorbeeld van syntaxis

In het volgende codevoorbeeld ziet u de basissyntaxis voor het configureren van een Structured Streaming-leesbewerking uit Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Zie Opties configureren voor leesbewerkingen voor pub/substreaming voor meer configuratieopties.

Toegang tot Pub/Sub configureren

Databricks raadt het gebruik van geheimen aan bij het leveren van autorisatieopties. De volgende opties zijn vereist om een verbinding te autoriseren:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

In de volgende tabel worden de rollen beschreven die vereist zijn voor de geconfigureerde referenties:

Rollen Vereist of optioneel Hoe deze wordt gebruikt
roles/pubsub.viewer of roles/viewer Vereist Controleren of het abonnement bestaat en abonnement ophalen
roles/pubsub.subscriber Vereist Gegevens ophalen uit een abonnement
roles/pubsub.editor of roles/editor Optioneel Maakt het maken van een abonnement mogelijk als er geen abonnement bestaat en maakt het ook deleteSubscriptionOnStreamStop mogelijk om abonnementen te verwijderen bij streambeƫindiging

Pub/subschema

Het schema voor de stream komt overeen met de records die zijn opgehaald uit Pub/Sub, zoals beschreven in de volgende tabel:

Veld Type
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Opties configureren voor lezen van pub/substreaming

In de volgende tabel worden de opties beschreven die worden ondersteund voor Pub/Sub. Alle opties worden geconfigureerd als onderdeel van een Structured Streaming-leesbewerking met behulp van .option("<optionName>", "<optionValue>") syntaxis.

Notitie

Sommige pub-/subconfiguratieopties gebruiken het concept van ophalen in plaats van microbatches. Dit weerspiegelt de details van de interne implementatie en opties werken op dezelfde manier als bij andere Structured Streaming-connectors, behalve dat records worden opgehaald en vervolgens verwerkt.

Optie Default value Beschrijving
numFetchPartitions Ingesteld op de helft van het aantal uitvoerders dat aanwezig is bij de initialisatie van de stream. Het aantal parallelle Spark-taken waarmee records uit een abonnement worden opgehaald.
deleteSubscriptionOnStreamStop false Als truehet abonnement dat aan de stream wordt doorgegeven, wordt verwijderd wanneer de streamingtaak eindigt.
maxBytesPerTrigger Geen Een zachte limiet voor de batchgrootte die moet worden verwerkt tijdens elke geactiveerde microbatch.
maxRecordsPerFetch 1000 Het aantal records dat per taak moet worden opgehaald voordat records worden verwerkt.
maxFetchPeriod 10 seconden De tijdsduur voor elke taak die moet worden opgehaald voordat records worden verwerkt. Databricks raadt aan de standaardwaarde te gebruiken.

Semantiek voor incrementele batchverwerking voor Pub/Sub

U kunt de Trigger.AvailableNow beschikbare records uit de Pub/Sub-bronnen gebruiken als een incrementele batch.

Azure Databricks registreert de tijdstempel wanneer u begint met lezen met de Trigger.AvailableNow instelling. Records die door de batch worden verwerkt, bevatten alle eerder opgehaalde gegevens en alle nieuw gepubliceerde records met een tijdstempel kleiner dan de vastgelegde begintijdstempel van de stream.

Zie Incrementele batchverwerking configureren.

Metrische streaminggegevens bewaken

Metrische gegevens over de voortgang van gestructureerde streaming rapporteren het aantal records dat is opgehaald en gereed voor verwerking, de grootte van de records die zijn opgehaald en gereed voor verwerking, en het aantal duplicaten dat is gezien sinds de stream wordt gestart. Hier volgt een voorbeeld van deze metrische gegevens:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Beperkingen

Speculatieve uitvoering (spark.speculation) wordt niet ondersteund met Pub/Sub.