Abonnieren von Google Pub/Sub

Azure Databricks bietet einen integrierten Connector zum Abonnieren von Google Pub/Sub in Databricks Runtime 13.3 LTS und höher. Dieser Connector stellt genau einmal die Verarbeitungssemantik für Datensätze vom Abonnenten bereit.

Hinweis

Pub/Sub veröffentlicht möglicherweise doppelte Datensätze, und Datensätze kommen möglicherweise außerhalb der Reihenfolge an den Abonnenten. Sie sollten Azure Databricks Code schreiben, um doppelte und ungeordnete Datensätze zu behandeln.

Syntaxbeispiel

Im folgenden Codebeispiel wird die grundlegende Syntax zum Konfigurieren eines Strukturierten Streaming-Lesevorgangs aus Pub/Sub veranschaulicht:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Weitere Konfigurationsoptionen finden Sie unter Konfigurieren der Optionen für Pub/Sub-Streamingauslesung.

Konfigurieren des Zugriffs auf Pub/Sub

Databricks empfiehlt die Verwendung von Geheimnissen beim Bereitstellen von Autorisierungsoptionen. Die folgenden Optionen sind erforderlich, um eine Verbindung zu autorisieren:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

In der folgenden Tabelle werden die Rollen beschrieben, die für die konfigurierten Anmeldeinformationen erforderlich sind:

Rollen Erforderlich oder optional Verwendung
roles/pubsub.viewer oder roles/viewer Erforderlich Prüfen Sie, ob ein Abonnement existiert und rufen Sie es ab
roles/pubsub.subscriber Erforderlich Abrufen von Daten aus einem Abonnement
roles/pubsub.editor oder roles/editor Optional Ermöglicht die Erstellung eines Abonnements, wenn kein Abonnement vorhanden ist’, und ermöglicht auch die Verwendung von deleteSubscriptionOnStreamStop zum Löschen von Abonnements bei der Datenstromendung.

Pub/Sub-Schema

Das Schema für den Datenstrom entspricht den Datensätzen, die aus Pub/Sub abgerufen werden, wie in der folgenden Tabelle beschrieben:

Feld Typ
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Konfigurieren von Optionen für Pub/Sub-Streaming-Lesevorgänge

In der folgenden Tabelle werden weitere Optionen beschrieben, die für Pub/Sub unterstützt werden. Alle Optionen werden als Teil eines strukturierten Streaming-Lesevorgangs mit .option("<optionName>", "<optionValue>")-Syntax konfiguriert.

Hinweis

Einige Pub/Sub-Konfigurationsoptionen verwenden das Konzept der Abrufe anstelle von Mikrobatches. Dies spiegelt interne Implementierungsdetails wider, und die Optionen funktionieren ähnlich wie die entsprechenden Optionen in anderen strukturierten Streaming-Connectors, mit der Ausnahme, dass die Datensätze abgerufen und dann verarbeitet werden.

Option Standardwert Beschreibung
numFetchPartitions Festlegen auf die Anzahl der Executors bei der Streaminitialisierung Die Anzahl der parallelen Spark-Aufgaben, die Datensätze aus einem Abonnement abrufen.
deleteSubscriptionOnStreamStop false Wenn true, wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht.
maxBytesPerTrigger Keine Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird.
maxRecordsPerFetch 1000 Die Anzahl der Datensätze, die pro Aufgabe abgerufen werden sollen, bevor Datensätze verarbeitet werden.
maxFetchPeriod 10 Sekunden Die Zeitdauer für jeden Vorgang, der vor der Verarbeitung von Datensätzen abgerufen werden soll. Databricks empfiehlt die Verwendung des Standardwerts.

Inkrementelle Batchverarbeitungsemantik für Pub/Sub

Sie können Trigger.AvailableNow verwenden, um verfügbare Datensätze aus Pub/Sub-Quellen in einem inkrementellen Batch zu nutzen.

Azure Databricks zeichnet den Zeitstempel auf, wenn Sie einen Lesevorgang mit der Trigger.AvailableNow Einstellung beginnen. Datensätze, die vom Batch verarbeitet werden, umfassen alle zuvor abgerufenen Daten und alle neu veröffentlichten Datensätze mit einem Zeitstempel, der kleiner als der aufgezeichnete Datenstrom-Startzeitstempel ist.

Weitere Informationen finden Sie unter Konfigurieren der inkrementellen Batchverarbeitung.

Überwachen von Streamingmetriken

Metriken für strukturiertes Streaming melden die Anzahl der abgerufenen und verarbeiteten Datensätze, die Größe der abgerufenen und verarbeiteten Datensätze sowie die Anzahl der Duplikate, die seit dem Start des Datenstroms angezeigt werden. Im Folgenden sehen Sie ein Beispiel für diese Metriken:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Begrenzungen

Spekulative Ausführung (spark.speculation) wird mit Pub/Sub nicht unterstützt.