Mengintegrasikan dukungan Apache Kafka Connect di Azure Event Hubs

Apache Kafka Connect adalah kerangka kerja untuk menghubungkan dan mengimpor/mengekspor data dari/ke sistem eksternal seperti MySQL, HDFS, dan sistem file melalui kluster Kafka. Tutorial ini memandu Anda menggunakan kerangka kerja Kafka Connect dengan Azure Event Hubs.

Tutorial ini memandu Anda mengintegrasikan Kafka Connect dengan pusat aktivitas serta menyebarkan konektor FileStreamSource dan FileStreamSink dasar. Meskipun konektor ini tidak dimaksudkan untuk penggunaan produksi, konektor ini menunjukkan skenario Kafka Koneksi end-to-end di mana Azure Event Hubs bertindak sebagai broker Kafka.

Catatan

Sampel ini tersedia di GitHub.

Dalam tutorial ini, Anda akan melakukan langkah-langkah berikut:

  • Membuat namespace layanan Pusat Aktivitas
  • Mengkloning proyek contoh
  • Mengonfigurasi Kafka Connect for Azure Event Hubs
  • Menjalankan Kafka Connect
  • Membuat konektor

Prasyarat

Untuk menyelesaikan panduan ini, pastikan prasyarat berikut dipenuhi:

Membuat namespace layanan Pusat Aktivitas

Ruang nama Azure Event Hubs diperlukan untuk mengirim dan menerima dari layanan Azure Event Hubs apa pun. Lihat Membuat pusat aktivitas untuk petunjuk membuat namespace layanan dan pusat aktivitas. Dapatkan string koneksi Azure Event Hubs dan nama domain yang sepenuhnya memenuhi syarat (FQDN) untuk digunakan nanti. Untuk petunjuk, lihat Mendapatkan string koneksi Azure Event Hubs.

Mengkloning proyek contoh

Kloning repositori Azure Event Hubs dan navigasikan ke subfolder tutorial/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Mengonfigurasi Kafka Connect for Azure Event Hubs

Konfigurasi ulang minimal diperlukan saat mengalihkan throughput Kafka Connect dari Kafka ke Azure Event Hubs. Contoh connect-distributed.properties berikut mengilustrasikan cara mengonfigurasi Connect untuk mengautentikasi dan berkomunikasi dengan titik akhir Kafka di Azure Event Hubs:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Penting

Ganti {YOUR.EVENTHUBS.CONNECTION.STRING} dengan string koneksi ke ruang nama Azure Event Hubs. Untuk instruksi tentang mendapatkan string sambungan, lihat Mendapatkan string sambungan Azure Event Hubs. Berikut adalah contoh konfigurasi: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Menjalankan Kafka Connect

Dalam langkah ini, pekerja Kafka Connect dimulai secara lokal dalam mode terdistribusi, menggunakan Azure Event Hubs untuk mempertahankan status kluster.

  1. Simpan file connect-distributed.properties di atas secara lokal. Pastikan untuk mengganti semua nilai dalam kurung kurawal.
  2. Navigasikan ke lokasi rilis Kafka di komputer Anda.
  3. Jalankan ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. API REST pekerja Connect siap untuk interaksi saat Anda melihat 'INFO Finished starting connectors and tasks'.

Catatan

Kafka Connect menggunakan API Kafka AdminClient untuk secara otomatis membuat topik dengan konfigurasi yang direkomendasikan, termasuk pemadatan. Pemeriksaan cepat ruang nama di portal Microsoft Azure mengungkapkan bahwa topik internal pekerja Connect telah dibuat secara otomatis.

Topik internal Kafka Connect harus menggunakan pemadatan. Tim Azure Event Hubs tidak bertanggung jawab untuk memperbaiki konfigurasi yang tidak tepat jika topik Connect internal tidak dikonfigurasi dengan benar.

Membuat konektor

Bagian ini memandu Anda menelusuri konektor FileStreamSource dan FileStreamSink.

  1. Buat direktori untuk file data input dan output.

    mkdir ~/connect-quickstart
    
  2. Buat dua file: satu file dengan data turunan dari yang dibaca konektor FileStreamSource, dan satu lagi yang ditulis konektor FileStreamSink kita.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Buat konektor FileStreamSource. Pastikan untuk mengganti kurung keriting dengan jalur direktori beranda Anda.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Anda akan melihat pusat connect-quickstart aktivitas pada instans Azure Event Hubs setelah menjalankan perintah di atas.

  4. Periksa status konektor sumber.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Secara opsional, Anda dapat menggunakan Azure Service Bus Explorer untuk memverifikasi bahwa kejadian telah sampai dalam topik connect-quickstart tersebut.

  5. Buat Konektor FileStreamSink. Sekali lagi, pastikan untuk mengganti kurung keriting dengan jalur direktori beranda Anda.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Periksa status konektor sink.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Pastikan data telah direplikasi di antara file dan bahwa data identik di kedua file.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Pembersihan

Kafka Koneksi membuat topik Azure Event Hubs untuk menyimpan konfigurasi, offset, dan status yang bertahan bahkan setelah kluster Koneksi dihapus. Kecuali persistensi ini diinginkan, sebaiknya hapus topik ini. Anda mungkin juga ingin menghapus connect-quickstart Azure Event Hubs yang dibuat selama panduan ini.

Langkah berikutnya

Untuk mempelajari selengkapnya tentang Azure Event Hubs untuk Kafka, lihat artikel berikut ini: