Abonnieren von Google Pub/Sub
Azure Databricks bietet einen integrierten Connector zum Abonnieren von Google Pub/Sub in Databricks Runtime 13.3 LTS und höher. Dieser Connector stellt genau einmal die Verarbeitungssemantik für Datensätze vom Abonnenten bereit.
Hinweis
Pub/Sub veröffentlicht möglicherweise doppelte Datensätze, und Datensätze kommen möglicherweise außerhalb der Reihenfolge an den Abonnenten. Sie sollten Azure Databricks Code schreiben, um doppelte und ungeordnete Datensätze zu behandeln.
Syntaxbeispiel
Im folgenden Codebeispiel wird die grundlegende Syntax zum Konfigurieren eines Strukturierten Streaming-Lesevorgangs aus Pub/Sub veranschaulicht:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Weitere Konfigurationsoptionen finden Sie unter Konfigurieren der Optionen für Pub/Sub-Streamingauslesung.
Konfigurieren des Zugriffs auf Pub/Sub
Databricks empfiehlt die Verwendung von Geheimnissen beim Bereitstellen von Autorisierungsoptionen. Die folgenden Optionen sind erforderlich, um eine Verbindung zu autorisieren:
clientEmail
clientId
privateKey
privateKeyId
In der folgenden Tabelle werden die Rollen beschrieben, die für die konfigurierten Anmeldeinformationen erforderlich sind:
Rollen | Erforderlich oder optional | Verwendung |
---|---|---|
roles/pubsub.viewer oder roles/viewer |
Erforderlich | Prüfen Sie, ob ein Abonnement existiert und rufen Sie es ab |
roles/pubsub.subscriber |
Erforderlich | Abrufen von Daten aus einem Abonnement |
roles/pubsub.editor oder roles/editor |
Optional | Ermöglicht die Erstellung eines Abonnements, wenn kein Abonnement vorhanden ist’, und ermöglicht auch die Verwendung von deleteSubscriptionOnStreamStop zum Löschen von Abonnements bei der Datenstromendung. |
Pub/Sub-Schema
Das Schema für den Datenstrom entspricht den Datensätzen, die aus Pub/Sub abgerufen werden, wie in der folgenden Tabelle beschrieben:
Feld | Typ |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfigurieren von Optionen für Pub/Sub-Streaming-Lesevorgänge
In der folgenden Tabelle werden weitere Optionen beschrieben, die für Pub/Sub unterstützt werden. Alle Optionen werden als Teil eines strukturierten Streaming-Lesevorgangs mit .option("<optionName>", "<optionValue>")
-Syntax konfiguriert.
Hinweis
Einige Pub/Sub-Konfigurationsoptionen verwenden das Konzept der Abrufe anstelle von Mikrobatches. Dies spiegelt interne Implementierungsdetails wider, und die Optionen funktionieren ähnlich wie die entsprechenden Optionen in anderen strukturierten Streaming-Connectors, mit der Ausnahme, dass die Datensätze abgerufen und dann verarbeitet werden.
Option | Standardwert | Beschreibung |
---|---|---|
numFetchPartitions |
Wird auf die Hälfte der Anzahl der bei der Stream-Initialisierung vorhandenen Executors gesetzt. | Die Anzahl der parallelen Spark-Aufgaben, die Datensätze aus einem Abonnement abrufen. |
deleteSubscriptionOnStreamStop |
false |
Wenn true , wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht. |
maxBytesPerTrigger |
Keine | Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird. |
maxRecordsPerFetch |
1000 | Die Anzahl der Datensätze, die pro Aufgabe abgerufen werden sollen, bevor Datensätze verarbeitet werden. |
maxFetchPeriod |
10 Sekunden | Die Zeitdauer für jeden Vorgang, der vor der Verarbeitung von Datensätzen abgerufen werden soll. Databricks empfiehlt die Verwendung des Standardwerts. |
Inkrementelle Batchverarbeitungsemantik für Pub/Sub
Sie können Trigger.AvailableNow
verwenden, um verfügbare Datensätze aus Pub/Sub-Quellen in einem inkrementellen Batch zu nutzen.
Azure Databricks zeichnet den Zeitstempel auf, wenn Sie einen Lesevorgang mit der Trigger.AvailableNow
Einstellung beginnen. Datensätze, die vom Batch verarbeitet werden, umfassen alle zuvor abgerufenen Daten und alle neu veröffentlichten Datensätze mit einem Zeitstempel, der kleiner als der aufgezeichnete Datenstrom-Startzeitstempel ist.
Weitere Informationen finden Sie unter Konfigurieren der inkrementellen Batchverarbeitung.
Überwachen von Streamingmetriken
Metriken für strukturiertes Streaming melden die Anzahl der abgerufenen und verarbeiteten Datensätze, die Größe der abgerufenen und verarbeiteten Datensätze sowie die Anzahl der Duplikate, die seit dem Start des Datenstroms angezeigt werden. Im Folgenden sehen Sie ein Beispiel für diese Metriken:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Begrenzungen
Spekulative Ausführung (spark.speculation
) wird mit Pub/Sub nicht unterstützt.