在 Azure 事件中樞上整合 Apache Kafka Connect 支援
Apache Kafka Connect 是一種類架構,可連線至任何外部系統 (例如 MySQL、HDFS 以及通過 Kafka 叢集的檔案系統) 和來回匯入/匯出資料。 本文會逐步引導您搭配使用 Kafka Connect 架構與事件中樞。
本文會逐步引導您整合 Kafka Connect 與事件中樞,以及部署基本的 FileStreamSource
和 FileStreamSink
連接器。 雖然這些連接器並非用於生產環境,但可示範 Azure 事件中樞會作為 Kafka 訊息代理程式的端對端 Kafka Connect 案例。
注意
您可在 GitHub 上取得此範例。
必要條件
若要完成本逐步解說,請確定您具有下列必要條件:
- Azure 訂閱。 如果您沒有訂用帳戶,請建立免費帳戶。
- Git
- Linux/MacOS
- 從 kafka.apache.org (英文) 取得 Kafka 的最新版本
- 請參閱適用於 Apache Kafka 的事件中樞簡介文章
建立事件中樞命名空間
您需要事件中樞命名空間,才能從任何事件中樞服務傳送和接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請取得事件中樞連接字串和完整網域名稱 (FQDN) 以供稍後使用。 如需相關指示,請參閱取得事件中樞連接字串。
複製範例專案
請複製 Azure 事件中樞存放庫,並瀏覽至 tutorials/connect 子資料夾:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
設定事件中樞的 Kafka Connect
將 Kafka Connect 輸送量從 Kafka 重新導向至事件中樞時,需要稍微重新設定。 下列 connect-distributed.properties
範例說明如何設定 Connect,以向事件中樞上的 Kafka 端點進行驗證與通訊:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING}
取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
執行 Kafka Connect
在此步驟中,Kafka Connect 背景工作角色會以分散模式在本機啟動,使用事件中樞來維持叢集的狀態。
- 在本機儲存
connect-distributed.properties
檔案。 請務必取代括號中的所有值。 - 瀏覽至機器上的 Kafka 版本位置。
- 執行
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
。 當您看到'INFO Finished starting connectors and tasks'
時,Connect 背景工作角色 REST API 便可供互動。
注意
Kafka Connect 會使用 Kafka AdminClient API,以使用建議的設定 (包括壓縮) 自動建立主題。 在 Azure 入口網站中快速檢查命名空間,會顯露出 Connect 背景工作角色的內部主題已自動建立。
Kafka Connect 內部主題必須使用壓縮。 若未正確設定 [內部連線] 主題,事件中樞小組將不會負責修正不當的設定。
建立連接器
本節會逐步引導您完成 FileStreamSource
和 FileStreamSink
連接器的建立。
建立輸入和輸出資料檔案的目錄。
mkdir ~/connect-quickstart
建立兩個檔案:一個檔案具有種子資料可供
FileStreamSource
連接器讀取,另一個檔案則可供FileStreamSink
連接器寫入。seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
建立
FileStreamSource
連接器。 請務必將大括號取代為主目錄路徑。curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
執行命令之後,您應該就會在事件中樞執行個體上看到事件中樞
connect-quickstart
。檢查來源連接器的狀態。
curl -s http://localhost:8083/connectors/file-source/status
或者,您也可以使用 Service Bus Explorer (英文) 來驗證事件已抵達
connect-quickstart
主題。建立 FileStreamSink 連接器。 同樣地,請務必將大括號取代為主目錄路徑。
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
檢查接收連接器的狀態。
curl -s http://localhost:8083/connectors/file-sink/status
確認已在檔案之間複寫資料,且該資料在這兩個檔案是相同的。
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
清理
Kafka Connect 會建立事件中樞主題,以便儲存即使在 Connect 叢集關閉之後仍會保存下來的設定、位移及狀態。 除非需要此持續性,否則建議您刪除這些主題。 也建議刪除此逐步教學期間建立的 connect-quickstart
事件中樞。
相關內容
若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: