Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Użyj wbudowanego łącznika, aby zasubskrybować usługę Google Pub/Sub. Ten łącznik zapewnia semantykę przetwarzania dokładnie raz dla rekordów od subskrybenta.
Uwaga
Pub/Sub może publikować zduplikowane rekordy lub rekordy mogą docierać do subskrybenta w niewłaściwej kolejności. Napisz kod do obsługi zduplikowanych i nieaktualnych rekordów.
Konfiguracja strumienia Pub/Sub
Poniższy przykład kodu przedstawia podstawową składnię konfigurowania odczytu strumieniowego z Pub/Sub za pomocą Structured Streaming.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Aby uzyskać więcej opcji konfiguracji, zobacz Configure options for Pub/Sub streaming read (Konfigurowanie opcji przesyłania strumieniowego pub/sub).
Konfigurowanie dostępu do usługi Pub/Sub
Skonfigurowane poświadczenia muszą mieć następujące role.
| Role | Wymagane lub opcjonalne | Jak rola jest używana |
|---|---|---|
roles/pubsub.viewer lub roles/viewer |
Wymagane | Sprawdza, czy subskrypcja istnieje i pobiera subskrypcję. |
roles/pubsub.subscriber |
Wymagane | Pobiera dane z subskrypcji. |
roles/pubsub.editor lub roles/editor |
Opcjonalnie | Umożliwia utworzenie subskrypcji, jeśli nie istnieje i umożliwia korzystanie z tej deleteSubscriptionOnStreamStop opcji w celu usunięcia subskrypcji po zakończeniu przesyłania strumieniowego. |
Usługa Databricks zaleca używanie sekretów przy podawaniu opcji autoryzacji. Do autoryzowania połączenia wymagane są następujące opcje:
clientEmailclientIdprivateKeyprivateKeyId
Omówienie schematu Pub/Sub
Schemat strumienia pasuje do rekordów pobranych z Pub/Sub, zgodnie z opisem w poniższej tabeli.
| Pole | Typ |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfiguracja opcji odczytu strumieniowego Pub/Sub
W poniższej tabeli opisano opcje obsługiwane przez pub/sub. Wszystkie opcje są konfigurowane w ramach odczytu za pomocą strukturyzowanego przesyłania strumieniowego przy użyciu składni .option("<optionName>", "<optionValue>").
Uwaga
Niektóre opcje konfiguracji Pub/Sub wykorzystują koncepcję pobierania zamiast mikropartii. Odzwierciedla to wewnętrzne szczegóły implementacji, a opcje działają podobnie do odpowiedników w innych łącznikach Structured Streaming, z tą różnicą, że rekordy są najpierw pobierane, a następnie przetwarzane.
| Opcja | Wartość domyślna | opis |
|---|---|---|
numFetchPartitions |
Ustaw na połowę liczby funkcji wykonawczych obecnych podczas inicjowania strumienia. | Liczba równoległych zadań platformy Spark, które pobierają rekordy z subskrypcji. |
deleteSubscriptionOnStreamStop |
false |
Jeśli true, subskrypcja przekazywana do strumienia jest usuwana po zakończeniu zadania przesyłania strumieniowego. |
maxBytesPerTrigger |
none |
Elastyczny limit rozmiaru partii do przetworzenia podczas każdej wyzwalanej mikropartii. |
maxRecordsPerFetch |
1000 |
Liczba rekordów do pobrania na każde zadanie przed ich przetworzeniem. |
maxFetchPeriod |
10s |
Czas trwania każdego zadania na pobranie danych przed przetworzeniem rekordów. Akceptuje ciąg czasu trwania, na przykład 1s przez 1 sekundę lub 1m przez 1 minutę. Usługa Databricks zaleca użycie wartości domyślnej. |
Stosowanie przyrostowego przetwarzania wsadowego z systemem Pub/Sub
Można używać Trigger.AvailableNow do przetwarzania dostępnych rekordów ze źródeł Pub/Sub jako inkrementalnej partii.
Usługa Azure Databricks rejestruje znacznik czasu, gdy rozpoczynasz odczyt z ustawieniem Trigger.AvailableNow. Rekordy przetwarzane przez partię obejmują wszystkie pobrane wcześniej dane i wszystkie nowo opublikowane rekordy z sygnaturą czasową mniejszą niż znacznik czasu rozpoczęcia zarejestrowanego strumienia. Aby uzyskać więcej informacji, zobacz AvailableNow: Przyrostowe przetwarzanie wsadowe.
Monitorowanie metryk strumieniowania Pub/Sub
Metryki postępu przesyłania strumieniowego ze strukturą raportują liczbę pobranych i gotowych do przetworzenia rekordów, rozmiar pobranych i gotowych do przetworzenia rekordów oraz liczbę duplikatów widocznych od rozpoczęcia strumienia. Poniżej przedstawiono przykład tych metryk:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Ograniczenia
Pub/Sub nie obsługuje wykonywania spekulatywnego (spark.speculation).