共用方式為


在 Azure 事件中樞上整合 Apache Kafka Connect 支援

Apache Kafka 連線 是一種架構,可透過 Kafka 叢集連線和匯入/導出數據至任何外部系統,例如 MySQL、HDFS 和文件系統。 本教學課程會逐步引導您使用 Kafka 連線 架構搭配事件中樞。

本教學課程將逐步引導您整合 Kafka 連線 與事件中樞,以及部署基本的 FileStreamSource 和 FileStreamSink 連接器。 雖然這些連接器不適用於生產環境,但它們示範端對端Kafka 連線案例,其中Azure 事件中樞做為Kafka訊息代理程式。

注意

此範例可在 GitHub取得。

在本教學課程中,您會執行下列步驟:

  • 建立事件中樞命名空間
  • 複製範例專案
  • 設定事件中樞的 Kafka 連線
  • 執行 Kafka 連線
  • 建立連接器

必要條件

若要完成本逐步解說,請確定您具備下列必要條件:

建立事件中樞命名空間

需要事件中樞命名空間,才能從任何事件中樞服務傳送和接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 取得事件中樞 連接字串 和完整功能變數名稱 (FQDN),以供稍後使用。 如需指示,請參閱取得事件中樞 連接字串

複製範例專案

複製 Azure 事件中樞 存放庫,並流覽至 tutorials/connect 子資料夾:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

設定事件中樞的 Kafka 連線

將 Kafka 連線 輸送量從 Kafka 重新導向至事件中樞時,需要最少的重新設定。 下列connect-distributed.properties範例說明如何設定 連線,以在事件中樞上驗證及與 Kafka 端點通訊:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

執行 Kafka 連線

在此步驟中,Kafka 連線 背景工作角色會在本機以分散式模式啟動,並使用事件中樞來維護叢集狀態。

  1. 將上述 connect-distributed.properties 檔案儲存在本機。 請務必取代大括弧中的所有值。
  2. 流覽至您電腦上的 Kafka 版本位置。
  3. 執行 ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties。 當您看到 'INFO Finished starting connectors and tasks'時,連線 背景工作角色 REST API 已準備好進行互動。

注意

Kafka 連線 使用 Kafka 管理員 Client API 來自動建立具有建議組態的主題,包括壓縮。 Azure 入口網站 中命名空間的快速檢查顯示已自動建立 連線 背景工作角色的內部主題。

Kafka 連線 內部主題必須使用壓縮。 如果內部 連線 主題設定不正確,事件中樞小組不負責修正不正確的設定。

建立連接器

本節將逐步引導您啟動 FileStreamSource 和 FileStreamSink 連接器。

  1. 建立輸入和輸出數據檔的目錄。

    mkdir ~/connect-quickstart
    
  2. 建立兩個檔案:一個檔案,其中一個檔案具有 FileStreamSource 連接器讀取的種子數據,另一個檔案會寫入 FileStreamSink 連接器。

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. 建立 FileStreamSource 連接器。 請務必以主目錄路徑取代大括弧。

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    執行上述命令之後,您應該會在事件中樞實例上看到事件中樞 connect-quickstart

  4. 檢查來源連接器的狀態。

    curl -s http://localhost:8083/connectors/file-source/status
    

    您可以選擇性地使用 服務匯流排 Explorer 來確認事件已抵達connect-quickstart主題。

  5. 建立 FileStreamSink 連線 or。 同樣地,請確定您將大括弧取代為主目錄路徑。

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. 檢查接收連接器的狀態。

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. 確認數據已在檔案之間復寫,而且數據在兩個檔案之間都相同。

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

清理

Kafka 連線 建立事件中樞主題,以儲存即使 連線 叢集關閉之後仍會保存的設定、位移和狀態。 除非需要此持續性,否則建議刪除這些主題。 您也可以刪除 connect-quickstart 本逐步解說期間建立的事件中樞。

下一步

若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: