Abonneren op Google Pub/Sub
Azure Databricks biedt een ingebouwde connector om u te abonneren op Google Pub/Sub in Databricks Runtime 13.3 LTS en hoger. Deze connector biedt exact eenmaal verwerkingssemantiek voor records van de abonnee.
Notitie
Pub/Sub publiceert mogelijk dubbele records en records komen mogelijk niet in orde aan de abonnee. U moet Azure Databricks-code schrijven om dubbele en niet-orderrecords te verwerken.
Voorbeeld van syntaxis
In het volgende codevoorbeeld ziet u de basissyntaxis voor het configureren van een Structured Streaming-leesbewerking uit Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Zie Opties configureren voor leesbewerkingen voor pub/substreaming voor meer configuratieopties.
Toegang tot Pub/Sub configureren
Databricks raadt het gebruik van geheimen aan bij het leveren van autorisatieopties. De volgende opties zijn vereist om een verbinding te autoriseren:
clientEmail
clientId
privateKey
privateKeyId
In de volgende tabel worden de rollen beschreven die vereist zijn voor de geconfigureerde referenties:
Rollen | Vereist of optioneel | Hoe deze wordt gebruikt |
---|---|---|
roles/pubsub.viewer of roles/viewer |
Vereist | Controleren of het abonnement bestaat en abonnement ophalen |
roles/pubsub.subscriber |
Vereist | Gegevens ophalen uit een abonnement |
roles/pubsub.editor of roles/editor |
Optioneel | Maakt het maken van een abonnement mogelijk als er geen abonnement bestaat en maakt het ook deleteSubscriptionOnStreamStop mogelijk om abonnementen te verwijderen bij streambeƫindiging |
Pub/subschema
Het schema voor de stream komt overeen met de records die zijn opgehaald uit Pub/Sub, zoals beschreven in de volgende tabel:
Veld | Type |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Opties configureren voor lezen van pub/substreaming
In de volgende tabel worden de opties beschreven die worden ondersteund voor Pub/Sub. Alle opties worden geconfigureerd als onderdeel van een Structured Streaming-leesbewerking met behulp van .option("<optionName>", "<optionValue>")
syntaxis.
Notitie
Sommige pub-/subconfiguratieopties gebruiken het concept van ophalen in plaats van microbatches. Dit weerspiegelt de details van de interne implementatie en opties werken op dezelfde manier als bij andere Structured Streaming-connectors, behalve dat records worden opgehaald en vervolgens verwerkt.
Optie | Default value | Beschrijving |
---|---|---|
numFetchPartitions |
Ingesteld op de helft van het aantal uitvoerders dat aanwezig is bij de initialisatie van de stream. | Het aantal parallelle Spark-taken waarmee records uit een abonnement worden opgehaald. |
deleteSubscriptionOnStreamStop |
false |
Als true het abonnement dat aan de stream wordt doorgegeven, wordt verwijderd wanneer de streamingtaak eindigt. |
maxBytesPerTrigger |
Geen | Een zachte limiet voor de batchgrootte die moet worden verwerkt tijdens elke geactiveerde microbatch. |
maxRecordsPerFetch |
1000 | Het aantal records dat per taak moet worden opgehaald voordat records worden verwerkt. |
maxFetchPeriod |
10 seconden | De tijdsduur voor elke taak die moet worden opgehaald voordat records worden verwerkt. Databricks raadt aan de standaardwaarde te gebruiken. |
Semantiek voor incrementele batchverwerking voor Pub/Sub
U kunt de Trigger.AvailableNow
beschikbare records uit de Pub/Sub-bronnen gebruiken als een incrementele batch.
Azure Databricks registreert de tijdstempel wanneer u begint met lezen met de Trigger.AvailableNow
instelling. Records die door de batch worden verwerkt, bevatten alle eerder opgehaalde gegevens en alle nieuw gepubliceerde records met een tijdstempel kleiner dan de vastgelegde begintijdstempel van de stream.
Zie Incrementele batchverwerking configureren.
Metrische streaminggegevens bewaken
Metrische gegevens over de voortgang van gestructureerde streaming rapporteren het aantal records dat is opgehaald en gereed voor verwerking, de grootte van de records die zijn opgehaald en gereed voor verwerking, en het aantal duplicaten dat is gezien sinds de stream wordt gestart. Hier volgt een voorbeeld van deze metrische gegevens:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Beperkingen
Speculatieve uitvoering (spark.speculation
) wordt niet ondersteund met Pub/Sub.