Subskrybuj usługę Google Pub/Sub
Usługa Azure Databricks udostępnia wbudowany łącznik do subskrybowania usługi Google Pub/Sub w środowisku Databricks Runtime 13.3 LTS i nowszym. Ten łącznik zapewnia semantyka przetwarzania dokładnie raz dla rekordów od subskrybenta.
Uwaga
Pub/Sub może publikować zduplikowane rekordy, a rekordy mogą zostać dostarczone do subskrybenta z zamówienia. Należy napisać kod usługi Azure Databricks, aby obsługiwać zduplikowane i out-of-order rekordy.
Przykład składni
Poniższy przykład kodu przedstawia podstawową składnię konfigurowania przesyłania strumieniowego ze strukturą odczytu z pubu/pod:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Aby uzyskać więcej opcji konfiguracji, zobacz Configure options for Pub/Sub streaming read (Konfigurowanie opcji przesyłania strumieniowego pub/sub).
Konfigurowanie dostępu do usługi Pub/Sub
Usługa Databricks zaleca używanie wpisów tajnych podczas udostępniania opcji autoryzacji. Do autoryzowania połączenia wymagane są następujące opcje:
clientEmail
clientId
privateKey
privateKeyId
W poniższej tabeli opisano role wymagane dla skonfigurowanych poświadczeń:
Role | Wymagane lub opcjonalne | Jak jest używany |
---|---|---|
roles/pubsub.viewer lub roles/viewer |
Wymagania | Sprawdź, czy subskrypcja istnieje i uzyskaj subskrypcję |
roles/pubsub.subscriber |
Wymagania | Pobieranie danych z subskrypcji |
roles/pubsub.editor lub roles/editor |
Opcjonalnie | Umożliwia tworzenie subskrypcji, jeśli nie istnieje, a także umożliwia korzystanie z tej deleteSubscriptionOnStreamStop opcji w celu usunięcia subskrypcji po zakończeniu przesyłania strumieniowego |
Schemat pubu/podrzędnego
Schemat strumienia jest zgodny z rekordami pobranymi z pubu/pod, zgodnie z opisem w poniższej tabeli:
Pole | Typ |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfigurowanie opcji odczytu przesyłania strumieniowego pub/podrzędnego
W poniższej tabeli opisano opcje obsługiwane przez pub/sub. Wszystkie opcje są konfigurowane w ramach odczytu przesyłania strumieniowego ze strukturą przy użyciu .option("<optionName>", "<optionValue>")
składni.
Uwaga
Niektóre opcje konfiguracji Pub/Sub używają koncepcji pobierania zamiast mikrosadów. Odzwierciedla to szczegóły implementacji wewnętrznej i opcje działają podobnie do rejestracji w innych łącznikach przesyłania strumieniowego ze strukturą, z tą różnicą, że rekordy są pobierane, a następnie przetwarzane.
Opcja | Wartość domyślna | opis |
---|---|---|
numFetchPartitions |
Ustaw na połowę liczby funkcji wykonawczych obecnych podczas inicjowania strumienia. | Liczba równoległych zadań platformy Spark, które pobierają rekordy z subskrypcji. |
deleteSubscriptionOnStreamStop |
false |
Jeśli true subskrypcja przekazana do strumienia zostanie usunięta po zakończeniu zadania przesyłania strumieniowego. |
maxBytesPerTrigger |
Brak | Miękki limit rozmiaru partii, który ma być przetwarzany podczas każdej wyzwalanej mikrosadowej partii. |
maxRecordsPerFetch |
1000 | Liczba rekordów do pobrania na zadanie przed przetworzeniem rekordów. |
maxFetchPeriod |
10 sekund | Czas trwania każdego zadania do pobrania przed przetworzeniem rekordów. Usługa Databricks zaleca użycie wartości domyślnej. |
Semantyka przetwarzania wsadowego przyrostowego dla pubu/podrzędnego
Za pomocą Trigger.AvailableNow
polecenia można korzystać z dostępnych rekordów ze źródeł Pub/Sub w partii przyrostowej.
Usługa Azure Databricks rejestruje znacznik czasu podczas rozpoczynania odczytu z ustawieniem Trigger.AvailableNow
. Rekordy przetwarzane przez partię obejmują wszystkie pobrane wcześniej dane i wszystkie nowo opublikowane rekordy z sygnaturą czasową mniejszą niż znacznik czasu rozpoczęcia zarejestrowanego strumienia.
Zobacz Konfigurowanie przyrostowego przetwarzania wsadowego.
Monitorowanie metryk przesyłania strumieniowego
Metryki postępu przesyłania strumieniowego ze strukturą raportują liczbę pobranych i gotowych do przetworzenia rekordów, rozmiar pobranych i gotowych do przetworzenia rekordów oraz liczbę duplikatów widocznych od rozpoczęcia strumienia. Poniżej przedstawiono przykład tych metryk:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Ograniczenia
Wykonywanie spekulatywne (spark.speculation
) nie jest obsługiwane w przypadku usługi Pub/Sub.