Udostępnij za pośrednictwem


Przesyłanie strumieniowe z pulsu Apache

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

W środowisku Databricks Runtime 14.1 lub nowszym można użyć przesyłania strumieniowego ze strukturą do strumieniowego przesyłania danych z usługi Apache Pulsar w usłudze Azure Databricks.

Przesyłanie strumieniowe ze strukturą zapewnia semantyka przetwarzania dokładnie raz dla danych odczytywanych ze źródeł Pulsar.

Przykład składni

Poniżej przedstawiono podstawowy przykład użycia przesyłania strumieniowego ze strukturą do odczytu z pulsaru:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Aby określić tematy, należy zawsze podać element service.url i jedną z następujących opcji:

  • topic
  • topics
  • topicsPattern

Aby uzyskać pełną listę opcji, zobacz Konfigurowanie opcji odczytu pulsar przesyłania strumieniowego.

Uwierzytelnianie w pulsie

Usługa Azure Databricks obsługuje uwierzytelnianie magazynu zaufania i magazynu kluczy w usłudze Pulsar. Usługa Databricks zaleca używanie wpisów tajnych podczas przechowywania szczegółów konfiguracji.

Podczas konfigurowania strumienia można ustawić następujące opcje:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Jeśli strumień używa PulsarAdminelementu , ustaw również następujące ustawienia:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

W poniższym przykładzie przedstawiono konfigurowanie opcji uwierzytelniania:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Schemat pulsu

Schemat rekordów odczytywanych z Pulsar zależy od tego, jak tematy mają zakodowane schematy.

  • W przypadku tematów ze schematem Avro lub JSON nazwy pól i typy pól są zachowywane w wynikowej ramce danych platformy Spark.
  • W przypadku tematów bez schematu lub prostego typu danych w Pulsar ładunek jest ładowany do value kolumny.
  • Jeśli czytelnik jest skonfigurowany do odczytywania wielu tematów z różnymi schematami, ustaw opcję allowDifferentTopicSchemas ładowania nieprzetworzonej value zawartości do kolumny.

Rekordy pulsarne mają następujące pola metadanych:

Kolumna Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Konfigurowanie opcji odczytu przesyłania strumieniowego Pulsar

Wszystkie opcje są konfigurowane w ramach odczytu przesyłania strumieniowego ze strukturą przy użyciu .option("<optionName>", "<optionValue>") składni. Uwierzytelnianie można również skonfigurować przy użyciu opcji. Zobacz Uwierzytelnianie w pulsacie.

W poniższej tabeli opisano wymagane konfiguracje pulsaru. Należy określić tylko jedną z opcji topiclub topicstopicsPattern.

Opcja Wartość domyślna Opis
service.url Brak Konfiguracja Pulsar serviceUrl dla usługi Pulsar.
topic Brak Ciąg nazwy tematu do użytku.
topics Brak Rozdzielona przecinkami lista tematów do użytku.
topicsPattern Brak Ciąg wyrażeń regularnych Języka Java, który ma być zgodny z tematami do korzystania.

W poniższej tabeli opisano inne opcje obsługiwane przez pulsar:

Opcja Wartość domyślna Opis
predefinedSubscription Brak Wstępnie zdefiniowana nazwa subskrypcji używana przez łącznik do śledzenia postępu aplikacji platformy Spark.
subscriptionPrefix Brak Prefiks używany przez łącznik do generowania losowej subskrypcji do śledzenia postępu aplikacji platformy Spark.
pollTimeoutMs 120000 Limit czasu odczytywania komunikatów z Pulsar w milisekundach.
waitingForNonExistedTopic false Czy łącznik powinien czekać na utworzenie żądanych tematów.
failOnDataLoss true Określa, czy zapytanie nie powiodło się w przypadku utraty danych (na przykład tematy są usuwane lub komunikaty są usuwane z powodu zasad przechowywania).
allowDifferentTopicSchemas false Jeśli wiele tematów z różnymi schematami jest odczytywanych, użyj tego parametru, aby wyłączyć automatyczne deserializacji wartości tematu opartego na schemacie. Tylko nieprzetworzone wartości są zwracane, gdy jest trueto .
startingOffsets latest Jeśli latestelement , czytelnik odczytuje najnowsze rekordy po uruchomieniu. Jeśli earliestelement , czytnik odczytuje od najwcześniejszego przesunięcia. Użytkownik może również określić ciąg JSON, który określa określone przesunięcie.
maxBytesPerTrigger Brak Miękki limit maksymalnej liczby bajtów, które chcemy przetworzyć na mikrobajt. Jeśli jest to określone, admin.url należy również określić.
admin.url Brak Konfiguracja Pulsar serviceHttpUrl . Wymagane tylko wtedy, gdy maxBytesPerTrigger jest określony.

Można również określić dowolną konfigurację klienta pulsar, administratora i czytelnika przy użyciu następujących wzorców:

Wzorzec Łącze do opcji konifiguracji
pulsar.client.* Konfiguracja klienta Pulsar
pulsar.admin.* Konfiguracja administratora pulsu
pulsar.reader.* Konfiguracja czytnika pulsar

Konstruowanie przesunięcia początkowego JSON

Możesz ręcznie skonstruować identyfikator komunikatu, aby określić określone przesunięcie i przekazać go jako kod JSON do startingOffsets opcji. W poniższym przykładzie kodu pokazano tę składnię:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()