Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Verwenden Sie den integrierten Connector, um Google Pub/Sub zu abonnieren. Dieser Connector verfügt über Exactly-once-Verarbeitungssemantik für Zeilen aus dem Abonnenten.
Hinweis
Pub/Sub veröffentlicht möglicherweise doppelte Zeilen, oder Zeilen kommen möglicherweise außerhalb der Reihenfolge an den Abonnenten. Sie müssen Code schreiben, um doppelte und nicht sortierte Zeilen zu behandeln.
Konfigurieren eines Pub/Sub-Streams
Im folgenden Codebeispiel wird gezeigt, wie Sie einen strukturierten Streaming-Lesevorgang aus Pub/Sub konfigurieren und sich mit privaten Schlüsseln authentifizieren.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Weitere Konfigurationsoptionen finden Sie unter Konfigurationsoptionen für Pub/Sub-Streaming-Lesen.
Konfigurieren des Zugriffs auf Pub/Sub
Ihren Zugangsdaten müssen die folgenden Rollen zugewiesen sein:
| Rollen | Erforderlich oder optional | Wie die Rolle verwendet wird |
|---|---|---|
roles/pubsub.viewer oder roles/viewer |
Erforderlich | Überprüft, ob ein Abonnement vorhanden ist und ein Abonnement erhält. |
roles/pubsub.subscriber |
Erforderlich | Ruft Daten aus einem Abonnement ab. |
roles/pubsub.editor oder roles/editor |
Wahlfrei | Ermöglicht die Erstellung eines Abonnements, wenn keines vorhanden ist, und ermöglicht die Verwendung der deleteSubscriptionOnStreamStop zum Löschen von Abonnements bei der Stream-Beendigung. |
Databricks empfiehlt, geheime Schlüssel bei der Verwendung von Schlüsseln zu verwenden. Die folgenden Optionen sind erforderlich, um eine Verbindung zu autorisieren:
clientEmailclientIdprivateKeyprivateKeyId
Grundlegendes zum Pub/Sub-Schema
Das Schema für den Datenstrom entspricht den Zeilen, die aus Pub/Sub abgerufen werden, wie in der folgenden Tabelle beschrieben:
| Feld | Typ |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfigurieren von Optionen für den Pub/Sub-Streaming-Lesevorgang
In der folgenden Tabelle werden weitere Optionen beschrieben, die für Pub/Sub unterstützt werden. Alle Optionen werden mit .option("<optionName>", "<optionValue>") auf Ihrem Stream-Reader konfiguriert.
Hinweis
Einige Pub/Sub-Konfigurationsoptionen verwenden das Konzept der Abrufe anstelle von Mikrobatches. Dies ist ein internes Implementierungsdetail, und die Optionen funktionieren ähnlich wie andere Strukturierte Streaming-Connectors, mit der Ausnahme, dass Zeilen abgerufen und dann verarbeitet werden.
| Schlüssel | Standardwert | Beschreibung |
|---|---|---|
numFetchPartitions |
Wird auf die Hälfte der Anzahl der bei der Stream-Initialisierung vorhandenen Executors gesetzt. | Die Anzahl der parallelen Spark-Aufgaben, die Zeilen aus einem Abonnement abrufen. |
deleteSubscriptionOnStreamStop |
false |
Wenn true, wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht. |
maxBytesPerTrigger |
none |
Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird. |
maxRecordsPerFetch |
1000 |
Die Anzahl der Zeilen, die pro Aufgabe abgerufen werden sollen, bevor Zeilen verarbeitet werden. |
maxFetchPeriod |
10s |
Die Dauer, die jeder Aufgabe zum Abrufen vor der Verarbeitung von Zeilen zur Verfügung steht. Akzeptiert eine Dauerzeichenfolge, 1s z. B. 1 Sekunde oder 1m 1 Minute. Databricks empfiehlt die Verwendung des Standardwerts. |
Verwenden der inkrementellen Batchverarbeitung mit Pub/Sub
Sie können Trigger.AvailableNow verwenden, um verfügbare Zeilen aus den Pub/Sub-Quellen als inkrementellen Batch zu verarbeiten.
Azure Databricks zeichnet den Zeitstempel auf, wenn Sie einen Lesevorgang mit der Trigger.AvailableNow Einstellung beginnen. Vom Batch verarbeitete Zeilen umfassen alle zuvor abgerufenen Daten und alle neu veröffentlichten Zeilen mit einem Zeitstempel, der kleiner als der aufgezeichnete Startzeitstempel ist. Weitere Informationen finden Sie unter AvailableNow: Inkrementelle Batchverarbeitung.
Überwachen von Pub/Sub Streaming-Metriken
Metriken für strukturiertes Streaming melden die Anzahl der abgerufenen und verarbeiteten Zeilen, die Größe der abgerufenen und verarbeiteten Zeilen sowie die Anzahl der Duplikate, die seit dem Start des Datenstroms angezeigt werden.
Nachfolgend sehen Sie ein Beispiel für Pub/Sub-Metriken:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Begrenzungen
Pub/Sub unterstützt keine spekulative Ausführung mit spark.speculation.