Abonnieren Sie Google Pub/Sub

Verwenden Sie den integrierten Connector, um Google Pub/Sub zu abonnieren. Dieser Connector verfügt über Exactly-once-Verarbeitungssemantik für Zeilen aus dem Abonnenten.

Hinweis

Pub/Sub veröffentlicht möglicherweise doppelte Zeilen, oder Zeilen kommen möglicherweise außerhalb der Reihenfolge an den Abonnenten. Sie müssen Code schreiben, um doppelte und nicht sortierte Zeilen zu behandeln.

Konfigurieren eines Pub/Sub-Streams

Im folgenden Codebeispiel wird gezeigt, wie Sie einen strukturierten Streaming-Lesevorgang aus Pub/Sub konfigurieren und sich mit privaten Schlüsseln authentifizieren.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Weitere Konfigurationsoptionen finden Sie unter Konfigurationsoptionen für Pub/Sub-Streaming-Lesen.

Konfigurieren des Zugriffs auf Pub/Sub

Ihren Zugangsdaten müssen die folgenden Rollen zugewiesen sein:

Rollen Erforderlich oder optional Wie die Rolle verwendet wird
roles/pubsub.viewer oder roles/viewer Erforderlich Überprüft, ob ein Abonnement vorhanden ist und ein Abonnement erhält.
roles/pubsub.subscriber Erforderlich Ruft Daten aus einem Abonnement ab.
roles/pubsub.editor oder roles/editor Wahlfrei Ermöglicht die Erstellung eines Abonnements, wenn keines vorhanden ist, und ermöglicht die Verwendung der deleteSubscriptionOnStreamStop zum Löschen von Abonnements bei der Stream-Beendigung.

Databricks empfiehlt, geheime Schlüssel bei der Verwendung von Schlüsseln zu verwenden. Die folgenden Optionen sind erforderlich, um eine Verbindung zu autorisieren:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Grundlegendes zum Pub/Sub-Schema

Das Schema für den Datenstrom entspricht den Zeilen, die aus Pub/Sub abgerufen werden, wie in der folgenden Tabelle beschrieben:

Feld Typ
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Konfigurieren von Optionen für den Pub/Sub-Streaming-Lesevorgang

In der folgenden Tabelle werden weitere Optionen beschrieben, die für Pub/Sub unterstützt werden. Alle Optionen werden mit .option("<optionName>", "<optionValue>") auf Ihrem Stream-Reader konfiguriert.

Hinweis

Einige Pub/Sub-Konfigurationsoptionen verwenden das Konzept der Abrufe anstelle von Mikrobatches. Dies ist ein internes Implementierungsdetail, und die Optionen funktionieren ähnlich wie andere Strukturierte Streaming-Connectors, mit der Ausnahme, dass Zeilen abgerufen und dann verarbeitet werden.

Schlüssel Standardwert Beschreibung
numFetchPartitions Wird auf die Hälfte der Anzahl der bei der Stream-Initialisierung vorhandenen Executors gesetzt. Die Anzahl der parallelen Spark-Aufgaben, die Zeilen aus einem Abonnement abrufen.
deleteSubscriptionOnStreamStop false Wenn true, wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht.
maxBytesPerTrigger none Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird.
maxRecordsPerFetch 1000 Die Anzahl der Zeilen, die pro Aufgabe abgerufen werden sollen, bevor Zeilen verarbeitet werden.
maxFetchPeriod 10s Die Dauer, die jeder Aufgabe zum Abrufen vor der Verarbeitung von Zeilen zur Verfügung steht. Akzeptiert eine Dauerzeichenfolge, 1s z. B. 1 Sekunde oder 1m 1 Minute. Databricks empfiehlt die Verwendung des Standardwerts.

Verwenden der inkrementellen Batchverarbeitung mit Pub/Sub

Sie können Trigger.AvailableNow verwenden, um verfügbare Zeilen aus den Pub/Sub-Quellen als inkrementellen Batch zu verarbeiten.

Azure Databricks zeichnet den Zeitstempel auf, wenn Sie einen Lesevorgang mit der Trigger.AvailableNow Einstellung beginnen. Vom Batch verarbeitete Zeilen umfassen alle zuvor abgerufenen Daten und alle neu veröffentlichten Zeilen mit einem Zeitstempel, der kleiner als der aufgezeichnete Startzeitstempel ist. Weitere Informationen finden Sie unter AvailableNow: Inkrementelle Batchverarbeitung.

Überwachen von Pub/Sub Streaming-Metriken

Metriken für strukturiertes Streaming melden die Anzahl der abgerufenen und verarbeiteten Zeilen, die Größe der abgerufenen und verarbeiteten Zeilen sowie die Anzahl der Duplikate, die seit dem Start des Datenstroms angezeigt werden.

Nachfolgend sehen Sie ein Beispiel für Pub/Sub-Metriken:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Begrenzungen

Pub/Sub unterstützt keine spekulative Ausführung mit spark.speculation.