在 Azure 事件中樞上整合 Apache Kafka Connect 支援
Apache Kafka 連線 是一種架構,可透過 Kafka 叢集連線和匯入/導出數據至任何外部系統,例如 MySQL、HDFS 和文件系統。 本教學課程會逐步引導您使用 Kafka 連線 架構搭配事件中樞。
本教學課程將逐步引導您整合 Kafka 連線 與事件中樞,以及部署基本的 FileStreamSource 和 FileStreamSink 連接器。 雖然這些連接器不適用於生產環境,但它們示範端對端Kafka 連線案例,其中Azure 事件中樞做為Kafka訊息代理程式。
注意
此範例可在 GitHub 上取得。
在本教學課程中,您會執行下列步驟:
- 建立事件中樞命名空間
- 複製範例專案
- 設定事件中樞的 Kafka 連線
- 執行 Kafka 連線
- 建立連接器
必要條件
若要完成本逐步解說,請確定您具備下列必要條件:
- Azure 訂閱。 如果您沒有訂用帳戶,請建立免費帳戶。
- Git
- Linux/MacOS
- 可從 kafka.apache.org 取得 的最新 Kafka 版本
- 閱讀 Apache Kafka 事件中樞簡介文章
建立事件中樞命名空間
需要事件中樞命名空間,才能從任何事件中樞服務傳送和接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 取得事件中樞 連接字串 和完整功能變數名稱 (FQDN),以供稍後使用。 如需指示,請參閱取得事件中樞 連接字串。
複製範例專案
複製 Azure 事件中樞 存放庫,並流覽至 tutorials/connect 子資料夾:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
設定事件中樞的 Kafka 連線
將 Kafka 連線 輸送量從 Kafka 重新導向至事件中樞時,需要最少的重新設定。 下列connect-distributed.properties
範例說明如何設定 連線,以在事件中樞上驗證及與 Kafka 端點通訊:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING}
取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
執行 Kafka 連線
在此步驟中,Kafka 連線 背景工作角色會在本機以分散式模式啟動,並使用事件中樞來維護叢集狀態。
- 將上述
connect-distributed.properties
檔案儲存在本機。 請務必取代大括弧中的所有值。 - 流覽至您電腦上的 Kafka 版本位置。
- 執行
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
。 當您看到'INFO Finished starting connectors and tasks'
時,連線 背景工作角色 REST API 已準備好進行互動。
注意
Kafka 連線 使用 Kafka 管理員 Client API 來自動建立具有建議組態的主題,包括壓縮。 Azure 入口網站 中命名空間的快速檢查顯示已自動建立 連線 背景工作角色的內部主題。
Kafka 連線 內部主題必須使用壓縮。 如果內部 連線 主題設定不正確,事件中樞小組不負責修正不正確的設定。
建立連接器
本節將逐步引導您啟動 FileStreamSource 和 FileStreamSink 連接器。
建立輸入和輸出數據檔的目錄。
mkdir ~/connect-quickstart
建立兩個檔案:一個檔案,其中一個檔案具有 FileStreamSource 連接器讀取的種子數據,另一個檔案會寫入 FileStreamSink 連接器。
seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
建立 FileStreamSource 連接器。 請務必以主目錄路徑取代大括弧。
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
執行上述命令之後,您應該會在事件中樞實例上看到事件中樞
connect-quickstart
。檢查來源連接器的狀態。
curl -s http://localhost:8083/connectors/file-source/status
您可以選擇性地使用 服務匯流排 Explorer 來確認事件已抵達
connect-quickstart
主題。建立 FileStreamSink 連線 or。 同樣地,請確定您將大括弧取代為主目錄路徑。
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
檢查接收連接器的狀態。
curl -s http://localhost:8083/connectors/file-sink/status
確認數據已在檔案之間復寫,而且數據在兩個檔案之間都相同。
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
清理
Kafka 連線 建立事件中樞主題,以儲存即使 連線 叢集關閉之後仍會保存的設定、位移和狀態。 除非需要此持續性,否則建議刪除這些主題。 您也可以刪除 connect-quickstart
本逐步解說期間建立的事件中樞。
下一步
若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章:
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應