Share via


Apache Pulsar からのストリーム

重要

この機能はパブリック プレビュー段階にあります。

Databricks Runtime 14.1 以降では、構造化ストリーミングを使用して、Azure Databricks 上の Apache Pulsar からデータをストリーミングできます。

構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して 1 回だけ処理のセマンティクスを提供します。

構文の例

次に、構造化ストリーミングを使用して Pulsar から読み取る基本的な例を示します。

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

トピックを指定するには、常に service.url と次のいずれかのオプションを指定する必要があります。

  • topic
  • topics
  • topicsPattern

オプションの完全な一覧については、「Pulsar ストリーミング読み取りのオプションを構成する」を参照してください。

Pulsar に対する認証

Azure Databricks では、Pulsar に対する認証として、トラストストアとキーストアがサポートされています。 Databricks では、構成の詳細を格納する際にシークレットを使用することを推奨しています。

ストリームの構成中に、次のオプションを設定できます。

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

ストリームで PulsarAdmin を使用する場合は、次の設定も行います。

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

次の例では、認証オプションの構成を示します。

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar スキーマ

Pulsar から読み取られるレコードのスキーマは、トピックのスキーマがどのようにエンコードされているかによって異なります。

  • Avro スキーマまたは JSON スキーマを含むトピックの場合、フィールド名とフィールド型は結果の Spark DataFrame に保持されます。
  • スキーマのないトピック、または Pulsar の単純なデータ型を使用するトピックの場合、ペイロードは value 列に読み込まれます。
  • スキーマが異なる複数のトピックを読み取るリーダーが構成されている場合は、生のコンテンツを value 列に読み込む allowDifferentTopicSchemas を設定します。

Pulsar レコードには、次のメタデータ フィールドがあります。

Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Pulsar ストリーミング読み取りのオプションを構成する

すべてのオプションは、.option("<optionName>", "<optionValue>") 構文を使用して構造化ストリーミング読み取りの一部として構成されます。 オプションを使用して認証を構成することもできます。 「Pulsar に対する認証」を参照してください。

次の表で、Pulsar に必要な構成について説明します。 topictopicstopicsPattern のいずれかのオプションのみを指定する必要があります。

オプション 既定値 説明
service.url なし Pulsar サービスの Pulsar serviceUrl 構成。
topic なし 使用するトピックのトピック名文字列。
topics なし 使用するトピックのコンマ区切りリスト。
topicsPattern なし 使用するトピックに一致する Java 正規表現文字列。

次の表で、Pulsar でサポートされているその他のオプションについて説明します。

オプション 既定値 説明
predefinedSubscription なし Spark アプリケーションの進行状況を追跡するためにコネクタによって使用される、定義済みのサブスクリプション名。
subscriptionPrefix なし Spark アプリケーションの進行状況を追跡するランダムなサブスクリプションを生成するために、コネクタによって使用されるプレフィックス。
pollTimeoutMs 120000 Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)。
waitingForNonExistedTopic false コネクタは目的のトピックが作成されるまで待機する必要があるかどうか。
failOnDataLoss true データが失われたとき (たとえば、トピックが削除された場合や、アイテム保持ポリシーのためにメッセージが削除された場合) にクエリを失敗させるかどうかを制御します。
allowDifferentTopicSchemas false 異なるスキーマを持つ複数のトピックを読み取る場合は、このパラメーターを使用して、スキーマベースのトピック値の自動逆シリアル化をオフにします。 これが true の場合は、生の値のみが返されます。
startingOffsets latest latest の場合、リーダーは実行を開始した後に最新のレコードを読み取ります。 earliest の場合、リーダーは最も早いオフセットから読み取ります。 ユーザーは、特定のオフセットを指定する JSON 文字列を指定することもできます。
maxBytesPerTrigger なし マイクロバッチごとに処理する最大バイト数のソフト制限。 これを指定する場合は、admin.url も指定する必要があります。
admin.url なし Pulsar serviceHttpUrl 構成。 maxBytesPerTrigger が指定されている場合にのみ必要です。

次のパターンを使用して、Pulsar クライアント、管理者、リーダーの構成を指定することもできます。

パターン 構成オプションへのリンク
pulsar.client.* Pulsar クライアント構成
pulsar.admin.* Pulsar 管理者構成
pulsar.reader.* Pulsar リーダー構成

開始オフセット JSON を作成する

メッセージ ID を手動で作成して特定のオフセットを指定し、これを JSON として startingOffsets オプションに渡すことができます。 次のコード例は、この構文を示しています。

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()