Bagikan melalui


Mengintegrasikan dukungan Apache Kafka Connect pada Azure Event Hubs dengan Debezium untuk Penangkapan Data Perubahan

Change Data Capture (CDC) adalah teknik yang digunakan untuk melacak perubahan tingkat baris dalam tabel database sebagai respons untuk membuat, memperbarui, dan menghapus operasi. Debezium adalah platform terdistribusi yang dibangun di atas fitur Change Data Capture yang tersedia dalam database yang berbeda (misalnya, dekode logika di PostgreSQL). Ini menyediakan satu set konektor Kafka Connect yang memanfaatkan perubahan tingkat baris dalam tabel database dan mengonversinya menjadi aliran peristiwa yang kemudian dikirim ke Apache Kafka.

Tutorial ini memandu Anda melalui cara menyiapkan sistem berbasis tangkapan data perubahan di Azure menggunakan Azure Event Hubs (untuk Kafka), Azure Database for PostgreSQL dan Debezium. Ini menggunakan konektor Debezium PostgreSQL untuk mengalirkan modifikasi database dari PostgreSQL ke topik Kafka di Azure Event Hubs.

Catatan

Artikel ini berisi referensi ke istilah yang tidak lagi digunakan Microsoft. Saat istilah dihapus dari perangkat lunak, kami akan menghapusnya dari artikel ini.

Dalam tutorial ini, Anda akan melakukan langkah-langkah berikut:

  • Membuat namespace layanan Pusat Aktivitas
  • Menyiapkan dan mengonfigurasi Azure Database for PostgreSQL
  • Mengonfigurasi dan menjalankan Kafka Connect dengan konektor Debezium PostgreSQL
  • Pengujian penangkapan perubahan data
  • (Opsional) Mengonsumsi peristiwa data perubahan dengan konektor FileStreamSink

Prasyarat

Untuk menyelesaikan panduan ini, Anda memerlukan:

Membuat namespace layanan Pusat Aktivitas

Ruang nama Azure Event Hubs diperlukan untuk mengirim dan menerima dari layanan Azure Event Hubs apa pun. Lihat Membuat pusat acara untuk petunjuk membuat ruang nama dan pusat acara. Dapatkan string koneksi Azure Event Hubs dan nama domain yang sepenuhnya memenuhi syarat (FQDN) untuk digunakan nanti. Untuk petunjuk, lihat Mendapatkan string koneksi Azure Event Hubs.

Menyiapkan dan mengonfigurasi Azure Database for PostgreSQL

Azure Database for PostgreSQL adalah layanan database relasional berdasarkan versi komunitas mesin database PostgreSQL sumber terbuka, dan tersedia dalam tiga opsi penyebaran: Server Tunggal, Server Fleksibel, dan Cosmos DB for PostgreSQL. Ikuti petunjuk ini untuk membuat server Azure Database for PostgreSQL menggunakan portal Microsoft Azure.

Mengatur dan menjalankan Kafka Connect

Bagian ini membahas topik-topik berikut:

  • Instalasi konektor Debezium
  • Mengonfigurasi Kafka Connect untuk Azure Event Hubs
  • Mulai kluster Kafka Connect dengan konektor Debezium

Mengunduh dan menyiapkan konektor Debezium

Ikuti petunjuk terbaru dalam dokumentasi Debezium untuk mengunduh dan menyiapkan konektor.

Konfigurasikan Kafka Connect untuk Azure Event Hubs

Konfigurasi ulang minimal diperlukan saat mengalihkan throughput Kafka Connect dari Kafka ke Azure Event Hubs. Contoh connect-distributed.properties berikut mengilustrasikan cara mengonfigurasi Connect untuk mengautentikasi dan berkomunikasi dengan titik akhir Kafka di Azure Event Hubs:

Penting

  • Debezium secara otomatis membuat topik per tabel dan banyak topik metadata. Topik Kafka sesuai dengan instans Event Hubs (event hub). Untuk pemetaan Apache Kafka ke Azure Event Hubs, lihat Pemetaan konsep Kafka dan Azure Event Hubs.
  • Ada batasan yang berbeda pada jumlah pusat aktivitas di namespace Azure Event Hubs tergantung pada tingkat (Dasar, Standar, Premium, atau Khusus). Untuk batasan tersebut, lihat Kuota.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Penting

Ganti {YOUR.EVENTHUBS.CONNECTION.STRING} dengan string koneksi ke ruang nama Azure Event Hubs. Untuk instruksi tentang mendapatkan string sambungan, lihat Mendapatkan string sambungan Event Hubs. Berikut adalah contoh konfigurasi: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Menjalankan Kafka Connect

Dalam langkah ini, pekerja Kafka Connect dimulai secara lokal dalam mode terdistribusi, menggunakan Event Hubs untuk mempertahankan status kluster.

  1. connect-distributed.properties Simpan file secara lokal. Pastikan untuk mengganti semua nilai yang terdapat dalam tanda kurung kurawal.
  2. Navigasikan ke lokasi rilis Kafka di komputer Anda.
  3. Jalankan ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties dan tunggu hingga kluster dimulai.

Catatan

Kafka Connect menggunakan API Kafka AdminClient untuk secara otomatis membuat topik dengan konfigurasi yang direkomendasikan, termasuk pemadatan. Pemeriksaan cepat ruang nama di portal Microsoft Azure mengungkapkan bahwa topik internal pekerja Connect telah dibuat secara otomatis.

Topik internal Kafka Connect harus menggunakan pemadatan. Tim Azure Event Hubs tidak bertanggung jawab untuk memperbaiki konfigurasi yang tidak tepat jika topik Connect internal salah dikonfigurasi.

Mengonfigurasi dan memulai konektor sumber Debezium PostgreSQL

Buat file konfigurasi (pg-source-connector.json) untuk konektor sumber PostgreSQL - ganti nilai sesuai instans Azure PostgreSQL Anda.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Tips

database.server.name atribut adalah nama logis yang mengidentifikasi dan menyediakan namespace untuk server/kluster database PostgreSQL tertentu yang sedang dipantau.

Untuk membuat instans konektor, gunakan titik akhir REST API Kafka Connect:

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

Untuk memeriksa status konektor:

curl -s http://localhost:8083/connectors/todo-connector/status

Menguji penangkapan perubahan data

Untuk melihat mengubah pengambilan data dalam tindakan, Anda perlu membuat/memperbarui/menghapus rekaman di database Azure PostgreSQL.

Mulailah dengan menyambungkan ke database Azure PostgreSQL Anda (contoh berikut menggunakan psql).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Membuat tabel dan menyisipkan rekaman

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

Konektor sekarang harus beraksi dan mengirimkan peristiwa perubahan data ke topik Azure Event Hubs dengan nama my-server.public.todos berikut, dengan asumsi Anda memiliki my-server sebagai nilai untuk database.server.name dan public.todos adalah tabel yang perubahannya Anda lacak (sesuai dengan konfigurasi table.whitelist).

Memeriksa topik Event Hubs

Mari kita tinjau isi topik untuk memastikan semuanya bekerja seperti yang diharapkan. Contoh berikut menggunakan kafkacat, tetapi Anda juga dapat membuat konsumen menggunakan salah satu opsi yang tercantum di sini.

Buat file bernama kafkacat.conf dengan konten berikut:

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Catatan

Perbarui atribut metadata.broker.list dan sasl.password di kafkacat.conf sesuai informasi Azure Event Hubs.

Di terminal yang berbeda, mulai konsumen:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

Anda akan melihat payload JSON yang mewakili kejadian data perubahan yang dihasilkan di PostgreSQL sebagai respons terhadap baris yang Anda tambahkan ke tabel todos. Berikut adalah cuplikan payload:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fulfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

Kejadian ini terdiri dari payload bersama dengan schema (dihilangkan untuk keringkasan). Di bagian payload, perhatikan bagaimana operasi pembuatan ("op": "c") disajikan - "before": null berarti bahwa ini adalah baris yang baru dibuat dengan INSERT, after menyediakan nilai untuk kolom dalam baris, source menyediakan metadata instans PostgreSQL dari mana kejadian ini terdeteksi, dan seterusnya.

Anda dapat mencoba hal yang sama dengan operasi pembaruan atau penghapusan, dan memeriksa peristiwa perubahan data. Misalnya, untuk memperbarui status tugas configure and install connector (asumsikan id adalah 3):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(Opsional) Menginstal konektor FileStreamSink

Sekarang setelah semua todos perubahan tabel diambil dalam topik Event Hubs, Anda menggunakan konektor FileStreamSink (yang tersedia secara default di Kafka Connect) untuk mengonsumsi peristiwa ini.

Buat file konfigurasi (file-sink-connector.json) untuk konektor - ganti file atribut sesuai sistem file Anda.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

Untuk membuat konektor dan memeriksa statusnya:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Menyisipkan/memperbarui/menghapus rekaman database dan memantau rekaman dalam file sink output yang dikonfigurasi:

tail -f /Users/foo/todos-cdc.txt

Pembersihan

Kafka Connect membuat topik Azure Event Hubs untuk menyimpan konfigurasi, offset, dan status yang bertahan bahkan setelah kluster Kafka Connect dihapus. Kecuali jika persistensi ini diinginkan, kami sarankan Anda menghapus topik-topik ini. Anda mungkin juga ingin menghapus my-server.public.todos hub peristiwa yang dibuat selama proses ini.

Langkah berikutnya

Untuk mempelajari selengkapnya tentang Azure Event Hubs untuk Kafka, lihat artikel berikut ini: