Udostępnij za pośrednictwem


read_pubsub funkcja przesyłania strumieniowego o wartości tabeli

Dotyczy:zaznacz pole wyboru oznaczone jako tak Databricks SQL zaznacz pole wyboru oznaczone jako tak Databricks Runtime 13.3 LTS i nowsze

Zwraca tabelę z rekordami odczytanymi z pubu/pod tematu. Obsługuje tylko zapytania przesyłane strumieniowo.

Składnia

read_pubsub( { parameter => value } [, ...])

Argumenty

read_pubsub wymaga wywołania nazwanego parametru.

Jedynymi wymaganymi argumentami są subscriptionId, projectIdi topicId. Wszystkie inne argumenty są opcjonalne.

Aby uzyskać pełne opisy argumentów, zobacz Configure options for Pub/Sub streaming read (Konfigurowanie opcji dla odczytu przesyłania strumieniowego pub/sub).

Usługa Databricks zaleca używanie wpisów tajnych podczas udostępniania opcji autoryzacji. Zobacz funkcję wpisu tajnego.

Aby uzyskać szczegółowe informacje na temat konfigurowania dostępu do pubu/podsieci, zobacz Konfigurowanie dostępu do pubu/subskrypcji.

Parametr Type Opis
subscriptionId STRING Wymagany unikatowy identyfikator przypisany do subskrypcji Pub/Sub.
projectId STRING Wymagany identyfikator projektu Google Cloud skojarzony z tematem Pub/Sub.
topicId STRING Wymagane, identyfikator lub nazwa tematu Pub/Sub do zasubskrybowania.
clientEmail STRING Adres e-mail skojarzony z kontem usługi na potrzeby uwierzytelniania.
clientId STRING Identyfikator klienta skojarzony z kontem usługi na potrzeby uwierzytelniania.
privateKeyId STRING Identyfikator klucza prywatnego skojarzonego z kontem usługi.
privateKey STRING Klucz prywatny skojarzony z kontem usługi na potrzeby uwierzytelniania.

Te argumenty są używane do dalszego dostrajania podczas odczytywania z Pub/Sub:

Parametr Type Opis
numFetchPartitions STRING Opcjonalnie z domyślną liczbą funkcji wykonawczych. Liczba równoległych zadań platformy Spark, które pobierają rekordy z subskrypcji.
deleteSubscriptionOnStreamStop BOOLEAN Opcjonalnie z domyślnym false. Jeśli ustawiono wartość true, subskrypcja przekazana do strumienia zostanie usunięta po zakończeniu zadania przesyłania strumieniowego.
maxBytesPerTrigger STRING Miękki limit rozmiaru partii, który ma być przetwarzany podczas każdej wyzwalanej mikrosadowej partii. Wartość domyślna to "none".
maxRecordsPerFetch STRING Liczba rekordów do pobrania na zadanie przed przetworzeniem rekordów. Wartość domyślna to "1000".
maxFetchPeriod STRING Czas trwania każdego zadania do pobrania przed przetworzeniem rekordów. Wartość domyślna to "10s".

Zwraca

Tabela rekordów Pub/Sub z następującym schematem. Kolumna atrybutów może mieć wartość null, ale wszystkie inne kolumny nie mają wartości null.

Nazwisko Typ danych Dopuszczający wartość null Standardowa opis
messageId STRING Nie. Unikatowy identyfikator komunikatu Pub/Sub.
payload BINARY Nie. Zawartość komunikatu Pub/Sub.
attributes STRING Tak Pary klucz-wartość reprezentujące atrybuty komunikatu Pub/Sub. Jest to ciąg zakodowany w formacie JSON.
publishTimestampInMillis BIGINT Nie. Sygnatura czasowa opublikowania wiadomości w milisekundach.
sequenceNumber BIGINT Nie. Unikatowy identyfikator rekordu w ramach jego fragmentu.

Przykłady

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Teraz należy wykonać zapytanie dotyczące danych z elementu testing.streaming_table w celu dalszej analizy.

Błędne zapytania:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);