Azure Event Hubs で Apache Kafka Connect のサポートを統合する

Apache Kafka Connect は、Kafka クラスターを通じて、MySQL や HDFS、ファイル システムなどの外部システムに接続し、それらとの間でデータをインポート/エクスポートするためのフレームワークです。 このチュートリアルでは、Event Hubs と共に Kafka Connect フレームワークを使用する方法について説明します。

このチュートリアルでは、イベント ハブに Kafka Connect を統合し、基本的な FileStreamSource および FileStreamSink コネクタをデプロイする方法について説明します。 これらのコネクタは運用環境での使用を想定したものではありませんが、Azure Event Hubs が Kafka ブローカーとして機能する Kafka Connect のシナリオをエンド ツー エンドで示しています。

Note

このサンプルは GitHub で入手できます。

このチュートリアルでは、次の手順を実行します。

  • Event Hubs 名前空間を作成します
  • サンプル プロジェクトを複製する
  • Event Hubs 用に Kafka Connect を構成する
  • Kafka Connect を実行する
  • コネクタを作成する

前提条件

このチュートリアルを完了するには、次の前提条件を満たしている必要があります。

Event Hubs 名前空間を作成します

Event Hubs サービスとの間で送受信を行うには、イベント ハブの名前空間が必要です。 名前空間とイベント ハブを作成する手順については、イベント ハブの作成に関するページを参照してください。 Event Hubs の接続文字列と完全修飾ドメイン名 (FQDN) を、後で使用するために取得します。 手順については、「Get an Event Hubs connection string (Event Hubs の接続文字列を取得する)」を参照してください。

サンプル プロジェクトを複製する

Azure Event Hubs リポジトリを複製し、tutorials/connect サブフォルダーに移動します。

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Event Hubs 用に Kafka Connect を構成する

Kafka Connect のスループットを Kafka から Event Hubs にリダイレクトする際に、最小限の再構成が必要となります。 次の connect-distributed.properties サンプルは、Event Hubs 上の Kafka エンドポイントに対して認証と通信を行うように Connect を構成する方法を示しています。

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX"; などがあります。

Kafka Connect を実行する

この手順では、Event Hubs を使用し、Kafka Connect ワーカーをローカルから分散モードで開始して、クラスターの状態を維持します。

  1. 前出の connect-distributed.properties ファイルをローカルに保存します。 中かっこで囲んだ値はすべて置き換えてください。
  2. お使いのマシン上にある Kafka リリースの場所に移動します。
  3. ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties を実行します。 'INFO Finished starting connectors and tasks' が表示されれば、Connect ワーカーの REST API は対話可能な状態です。

Note

Kafka Connect は、Kafka AdminClient API を使用して、圧縮などの推奨される構成を含むトピックを自動的に作成します。 Azure portal で名前空間をざっとチェックすると、Connect ワーカーの内部的なトピックが自動的に作成されていることがわかります。

Kafka Connect の内部トピックでは、圧縮を使用する必要があります。 Connect の内部トピックが正しく構成されていない場合、Event Hubs チームでは不適切な構成を修正する責任を負いません。

コネクタを作成する

このセクションでは、FileStreamSource コネクタと FileStreamSink コネクタを立ち上げる方法について説明します。

  1. 入力データ ファイルと出力データ ファイル用のディレクトリを作成します。

    mkdir ~/connect-quickstart
    
  2. 2 つのファイルを作成します。FileStreamSource コネクタの読み取り元となる、シード データを含んだファイルと、この例の FileStreamSink コネクタの書き込み先となるファイルです。

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. FileStreamSource コネクタを作成します。 中かっこ内の値は、実際のホーム ディレクトリのパスに置き換えてください。

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    上記のコマンドを実行すると、Event Hubs インスタンスにイベント ハブ connect-quickstart が確認できます。

  4. ソース コネクタの状態を確認します。

    curl -s http://localhost:8083/connectors/file-source/status
    

    必要に応じて Service Bus エクスプローラーを使用して、イベントが connect-quickstart トピックに到着したことを確認できます。

  5. FileStreamSink コネクタを作成します。 こちらも、中かっこ内の値は、実際のホーム ディレクトリのパスに置き換えてください。

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. シンク コネクタの状態を確認します。

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. ファイル間でデータがレプリケートされていることと、そのデータが両方のファイル間で一致していることを確認します。

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

クリーンアップ

Kafka Connect は、イベント ハブのトピックを作成することによって、Connect クラスターが停止した後も永続的に存在する構成、オフセット、状態を格納します。 この永続化が不要である場合は、これらのトピックを削除することをお勧めします。 また、このチュートリアルで作成された connect-quickstart Event Hubs を削除することもできます。

次のステップ

Kafka 用 Event Hubs の詳細については、次の記事を参照してください。