Udostępnij za pośrednictwem


Subskrybuj usługę Google Pub/Sub

Usługa Azure Databricks udostępnia wbudowany łącznik do subskrybowania usługi Google Pub/Sub w środowisku Databricks Runtime 13.3 LTS i nowszym. Ten łącznik zapewnia semantyka przetwarzania dokładnie raz dla rekordów od subskrybenta.

Uwaga

Pub/Sub może publikować zduplikowane rekordy, a rekordy mogą zostać dostarczone do subskrybenta z zamówienia. Należy napisać kod usługi Azure Databricks, aby obsługiwać zduplikowane i out-of-order rekordy.

Przykład składni

Poniższy przykład kodu przedstawia podstawową składnię konfigurowania przesyłania strumieniowego ze strukturą odczytu z pubu/pod:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Aby uzyskać więcej opcji konfiguracji, zobacz Configure options for Pub/Sub streaming read (Konfigurowanie opcji przesyłania strumieniowego pub/sub).

Konfigurowanie dostępu do usługi Pub/Sub

Usługa Databricks zaleca używanie wpisów tajnych podczas udostępniania opcji autoryzacji. Do autoryzowania połączenia wymagane są następujące opcje:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

W poniższej tabeli opisano role wymagane dla skonfigurowanych poświadczeń:

Role Wymagane lub opcjonalne Jak jest używany
roles/pubsub.viewer lub roles/viewer Wymagania Sprawdź, czy subskrypcja istnieje i uzyskaj subskrypcję
roles/pubsub.subscriber Wymagania Pobieranie danych z subskrypcji
roles/pubsub.editor lub roles/editor Opcjonalnie Umożliwia tworzenie subskrypcji, jeśli nie istnieje, a także umożliwia korzystanie z tej deleteSubscriptionOnStreamStop opcji w celu usunięcia subskrypcji po zakończeniu przesyłania strumieniowego

Schemat pubu/podrzędnego

Schemat strumienia jest zgodny z rekordami pobranymi z pubu/pod, zgodnie z opisem w poniższej tabeli:

Pole Typ
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Konfigurowanie opcji odczytu przesyłania strumieniowego pub/podrzędnego

W poniższej tabeli opisano opcje obsługiwane przez pub/sub. Wszystkie opcje są konfigurowane w ramach odczytu przesyłania strumieniowego ze strukturą przy użyciu .option("<optionName>", "<optionValue>") składni.

Uwaga

Niektóre opcje konfiguracji Pub/Sub używają koncepcji pobierania zamiast mikrosadów. Odzwierciedla to szczegóły implementacji wewnętrznej i opcje działają podobnie do rejestracji w innych łącznikach przesyłania strumieniowego ze strukturą, z tą różnicą, że rekordy są pobierane, a następnie przetwarzane.

Opcja Wartość domyślna opis
numFetchPartitions Ustaw na połowę liczby funkcji wykonawczych obecnych podczas inicjowania strumienia. Liczba równoległych zadań platformy Spark, które pobierają rekordy z subskrypcji.
deleteSubscriptionOnStreamStop false Jeśli truesubskrypcja przekazana do strumienia zostanie usunięta po zakończeniu zadania przesyłania strumieniowego.
maxBytesPerTrigger Brak Miękki limit rozmiaru partii, który ma być przetwarzany podczas każdej wyzwalanej mikrosadowej partii.
maxRecordsPerFetch 1000 Liczba rekordów do pobrania na zadanie przed przetworzeniem rekordów.
maxFetchPeriod 10 sekund Czas trwania każdego zadania do pobrania przed przetworzeniem rekordów. Usługa Databricks zaleca użycie wartości domyślnej.

Semantyka przetwarzania wsadowego przyrostowego dla pubu/podrzędnego

Za pomocą Trigger.AvailableNow polecenia można korzystać z dostępnych rekordów ze źródeł Pub/Sub w partii przyrostowej.

Usługa Azure Databricks rejestruje znacznik czasu podczas rozpoczynania odczytu z ustawieniem Trigger.AvailableNow . Rekordy przetwarzane przez partię obejmują wszystkie pobrane wcześniej dane i wszystkie nowo opublikowane rekordy z sygnaturą czasową mniejszą niż znacznik czasu rozpoczęcia zarejestrowanego strumienia.

Zobacz Konfigurowanie przyrostowego przetwarzania wsadowego.

Monitorowanie metryk przesyłania strumieniowego

Metryki postępu przesyłania strumieniowego ze strukturą raportują liczbę pobranych i gotowych do przetworzenia rekordów, rozmiar pobranych i gotowych do przetworzenia rekordów oraz liczbę duplikatów widocznych od rozpoczęcia strumienia. Poniżej przedstawiono przykład tych metryk:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Ograniczenia

Wykonywanie spekulatywne (spark.speculation) nie jest obsługiwane w przypadku usługi Pub/Sub.