分享方式:


在 Azure 事件中樞上整合 Apache Kafka Connect 支援

Apache Kafka Connect 是一種類架構,可連線至任何外部系統 (例如 MySQL、HDFS 以及通過 Kafka 叢集的檔案系統) 和來回匯入/匯出資料。 本文會逐步引導您搭配使用 Kafka Connect 架構與事件中樞。

本文會逐步引導您整合 Kafka Connect 與事件中樞,以及部署基本的 FileStreamSourceFileStreamSink 連接器。 雖然這些連接器並非用於生產環境,但可示範 Azure 事件中樞會作為 Kafka 訊息代理程式的端對端 Kafka Connect 案例。

注意

您可在 GitHub 上取得此範例。

必要條件

若要完成本逐步解說,請確定您具有下列必要條件:

建立事件中樞命名空間

您需要事件中樞命名空間,才能從任何事件中樞服務傳送和接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請取得事件中樞連接字串和完整網域名稱 (FQDN) 以供稍後使用。 如需相關指示,請參閱取得事件中樞連接字串

複製範例專案

請複製 Azure 事件中樞存放庫,並瀏覽至 tutorials/connect 子資料夾:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

設定事件中樞的 Kafka Connect

將 Kafka Connect 輸送量從 Kafka 重新導向至事件中樞時,需要稍微重新設定。 下列 connect-distributed.properties 範例說明如何設定 Connect,以向事件中樞上的 Kafka 端點進行驗證與通訊:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

執行 Kafka Connect

在此步驟中,Kafka Connect 背景工作角色會以分散模式在本機啟動,使用事件中樞來維持叢集的狀態。

  1. 在本機儲存 connect-distributed.properties 檔案。 請務必取代括號中的所有值。
  2. 瀏覽至機器上的 Kafka 版本位置。
  3. 執行 ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties。 當您看到 'INFO Finished starting connectors and tasks' 時,Connect 背景工作角色 REST API 便可供互動。

注意

Kafka Connect 會使用 Kafka AdminClient API,以使用建議的設定 (包括壓縮) 自動建立主題。 在 Azure 入口網站中快速檢查命名空間,會顯露出 Connect 背景工作角色的內部主題已自動建立。

Kafka Connect 內部主題必須使用壓縮。 若未正確設定 [內部連線] 主題,事件中樞小組將不會負責修正不當的設定。

建立連接器

本節會逐步引導您完成 FileStreamSourceFileStreamSink 連接器的建立。

  1. 建立輸入和輸出資料檔案的目錄。

    mkdir ~/connect-quickstart
    
  2. 建立兩個檔案:一個檔案具有種子資料可供 FileStreamSource 連接器讀取,另一個檔案則可供 FileStreamSink 連接器寫入。

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. 建立 FileStreamSource 連接器。 請務必將大括號取代為主目錄路徑。

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    執行命令之後,您應該就會在事件中樞執行個體上看到事件中樞 connect-quickstart

  4. 檢查來源連接器的狀態。

    curl -s http://localhost:8083/connectors/file-source/status
    

    或者,您也可以使用 Service Bus Explorer (英文) 來驗證事件已抵達 connect-quickstart 主題。

  5. 建立 FileStreamSink 連接器。 同樣地,請務必將大括號取代為主目錄路徑。

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. 檢查接收連接器的狀態。

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. 確認已在檔案之間複寫資料,且該資料在這兩個檔案是相同的。

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

清理

Kafka Connect 會建立事件中樞主題,以便儲存即使在 Connect 叢集關閉之後仍會保存下來的設定、位移及狀態。 除非需要此持續性,否則建議您刪除這些主題。 也建議刪除此逐步教學期間建立的 connect-quickstart 事件中樞。

若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: