Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Change Data Capture (CDC) adalah teknik yang digunakan untuk melacak perubahan tingkat baris dalam tabel database sebagai respons untuk membuat, memperbarui, dan menghapus operasi. Debezium adalah platform terdistribusi yang dibangun di atas fitur Change Data Capture yang tersedia dalam database yang berbeda (misalnya, dekode logika di PostgreSQL). Ini menyediakan satu set konektor Kafka Connect yang memanfaatkan perubahan tingkat baris dalam tabel database dan mengonversinya menjadi aliran peristiwa yang kemudian dikirim ke Apache Kafka.
Tutorial ini memandu Anda melalui cara menyiapkan sistem berbasis tangkapan data perubahan di Azure menggunakan Azure Event Hubs (untuk Kafka), Azure Database for PostgreSQL dan Debezium. Ini menggunakan konektor Debezium PostgreSQL untuk mengalirkan modifikasi database dari PostgreSQL ke topik Kafka di Azure Event Hubs.
Catatan
Artikel ini berisi referensi ke istilah yang tidak lagi digunakan Microsoft. Saat istilah dihapus dari perangkat lunak, kami akan menghapusnya dari artikel ini.
Dalam tutorial ini, Anda akan melakukan langkah-langkah berikut:
- Membuat namespace layanan Pusat Aktivitas
- Menyiapkan dan mengonfigurasi Azure Database for PostgreSQL
- Mengonfigurasi dan menjalankan Kafka Connect dengan konektor Debezium PostgreSQL
- Pengujian penangkapan perubahan data
- (Opsional) Mengonsumsi peristiwa data perubahan dengan konektor
FileStreamSink
Prasyarat
Untuk menyelesaikan panduan ini, Anda memerlukan:
- Langganan Azure. Jika Anda tidak memilikinya, buat akun gratis.
- Linux/macOS
- Rilis Kafka (versi 1.1.1, versi Scala 2.11), tersedia dari kafka.apache.org
- Baca artikel pengantar Event Hubs for Apache Kafka
Membuat namespace layanan Pusat Aktivitas
Ruang nama Azure Event Hubs diperlukan untuk mengirim dan menerima dari layanan Azure Event Hubs apa pun. Lihat Membuat pusat acara untuk petunjuk membuat ruang nama dan pusat acara. Dapatkan string koneksi Azure Event Hubs dan nama domain yang sepenuhnya memenuhi syarat (FQDN) untuk digunakan nanti. Untuk petunjuk, lihat Mendapatkan string koneksi Azure Event Hubs.
Menyiapkan dan mengonfigurasi Azure Database for PostgreSQL
Azure Database for PostgreSQL adalah layanan database relasional berdasarkan versi komunitas mesin database PostgreSQL sumber terbuka, dan tersedia dalam tiga opsi penyebaran: Server Tunggal, Server Fleksibel, dan Cosmos DB for PostgreSQL. Ikuti petunjuk ini untuk membuat server Azure Database for PostgreSQL menggunakan portal Microsoft Azure.
Mengatur dan menjalankan Kafka Connect
Bagian ini membahas topik-topik berikut:
- Instalasi konektor Debezium
- Mengonfigurasi Kafka Connect untuk Azure Event Hubs
- Mulai kluster Kafka Connect dengan konektor Debezium
Mengunduh dan menyiapkan konektor Debezium
Ikuti petunjuk terbaru dalam dokumentasi Debezium untuk mengunduh dan menyiapkan konektor.
- Unduh file arsip plug-in untuk konektor. Misalnya, untuk mengunduh versi
1.2.0
konektor, gunakan tautan ini - https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz - Ekstrak file JAR dan salin ke plugin.path Kafka Connect.
Konfigurasikan Kafka Connect untuk Azure Event Hubs
Konfigurasi ulang minimal diperlukan saat mengalihkan throughput Kafka Connect dari Kafka ke Azure Event Hubs. Contoh connect-distributed.properties
berikut mengilustrasikan cara mengonfigurasi Connect untuk mengautentikasi dan berkomunikasi dengan titik akhir Kafka di Azure Event Hubs:
Penting
- Debezium secara otomatis membuat topik per tabel dan banyak topik metadata. Topik Kafka sesuai dengan instans Event Hubs (event hub). Untuk pemetaan Apache Kafka ke Azure Event Hubs, lihat Pemetaan konsep Kafka dan Azure Event Hubs.
- Ada batasan yang berbeda pada jumlah pusat aktivitas di namespace Azure Event Hubs tergantung pada tingkat (Dasar, Standar, Premium, atau Khusus). Untuk batasan tersebut, lihat Kuota.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Penting
Ganti {YOUR.EVENTHUBS.CONNECTION.STRING}
dengan string koneksi ke ruang nama Azure Event Hubs. Untuk instruksi tentang mendapatkan string sambungan, lihat Mendapatkan string sambungan Event Hubs. Berikut adalah contoh konfigurasi: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Menjalankan Kafka Connect
Dalam langkah ini, pekerja Kafka Connect dimulai secara lokal dalam mode terdistribusi, menggunakan Event Hubs untuk mempertahankan status kluster.
-
connect-distributed.properties
Simpan file secara lokal. Pastikan untuk mengganti semua nilai yang terdapat dalam tanda kurung kurawal. - Navigasikan ke lokasi rilis Kafka di komputer Anda.
- Jalankan
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
dan tunggu hingga kluster dimulai.
Catatan
Kafka Connect menggunakan API Kafka AdminClient untuk secara otomatis membuat topik dengan konfigurasi yang direkomendasikan, termasuk pemadatan. Pemeriksaan cepat ruang nama di portal Microsoft Azure mengungkapkan bahwa topik internal pekerja Connect telah dibuat secara otomatis.
Topik internal Kafka Connect harus menggunakan pemadatan. Tim Azure Event Hubs tidak bertanggung jawab untuk memperbaiki konfigurasi yang tidak tepat jika topik Connect internal salah dikonfigurasi.
Mengonfigurasi dan memulai konektor sumber Debezium PostgreSQL
Buat file konfigurasi (pg-source-connector.json
) untuk konektor sumber PostgreSQL - ganti nilai sesuai instans Azure PostgreSQL Anda.
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
Tips
database.server.name
atribut adalah nama logis yang mengidentifikasi dan menyediakan namespace untuk server/kluster database PostgreSQL tertentu yang sedang dipantau.
Untuk membuat instans konektor, gunakan titik akhir REST API Kafka Connect:
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
Untuk memeriksa status konektor:
curl -s http://localhost:8083/connectors/todo-connector/status
Menguji penangkapan perubahan data
Untuk melihat mengubah pengambilan data dalam tindakan, Anda perlu membuat/memperbarui/menghapus rekaman di database Azure PostgreSQL.
Mulailah dengan menyambungkan ke database Azure PostgreSQL Anda (contoh berikut menggunakan psql).
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
Membuat tabel dan menyisipkan rekaman
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
Konektor sekarang harus beraksi dan mengirimkan peristiwa perubahan data ke topik Azure Event Hubs dengan nama my-server.public.todos
berikut, dengan asumsi Anda memiliki my-server
sebagai nilai untuk database.server.name
dan public.todos
adalah tabel yang perubahannya Anda lacak (sesuai dengan konfigurasi table.whitelist
).
Memeriksa topik Event Hubs
Mari kita tinjau isi topik untuk memastikan semuanya bekerja seperti yang diharapkan. Contoh berikut menggunakan kafkacat
, tetapi Anda juga dapat membuat konsumen menggunakan salah satu opsi yang tercantum di sini.
Buat file bernama kafkacat.conf
dengan konten berikut:
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
Catatan
Perbarui atribut metadata.broker.list
dan sasl.password
di kafkacat.conf
sesuai informasi Azure Event Hubs.
Di terminal yang berbeda, mulai konsumen:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
Anda akan melihat payload JSON yang mewakili kejadian data perubahan yang dihasilkan di PostgreSQL sebagai respons terhadap baris yang Anda tambahkan ke tabel todos
. Berikut adalah cuplikan payload:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
Kejadian ini terdiri dari payload
bersama dengan schema
(dihilangkan untuk keringkasan). Di bagian payload
, perhatikan bagaimana operasi pembuatan ("op": "c"
) disajikan - "before": null
berarti bahwa ini adalah baris yang baru dibuat dengan INSERT
, after
menyediakan nilai untuk kolom dalam baris, source
menyediakan metadata instans PostgreSQL dari mana kejadian ini terdeteksi, dan seterusnya.
Anda dapat mencoba hal yang sama dengan operasi pembaruan atau penghapusan, dan memeriksa peristiwa perubahan data. Misalnya, untuk memperbarui status tugas configure and install connector
(asumsikan id
adalah 3
):
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
(Opsional) Menginstal konektor FileStreamSink
Sekarang setelah semua todos
perubahan tabel diambil dalam topik Event Hubs, Anda menggunakan konektor FileStreamSink (yang tersedia secara default di Kafka Connect) untuk mengonsumsi peristiwa ini.
Buat file konfigurasi (file-sink-connector.json
) untuk konektor - ganti file
atribut sesuai sistem file Anda.
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
Untuk membuat konektor dan memeriksa statusnya:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
Menyisipkan/memperbarui/menghapus rekaman database dan memantau rekaman dalam file sink output yang dikonfigurasi:
tail -f /Users/foo/todos-cdc.txt
Pembersihan
Kafka Connect membuat topik Azure Event Hubs untuk menyimpan konfigurasi, offset, dan status yang bertahan bahkan setelah kluster Kafka Connect dihapus. Kecuali jika persistensi ini diinginkan, kami sarankan Anda menghapus topik-topik ini. Anda mungkin juga ingin menghapus my-server.public.todos
hub peristiwa yang dibuat selama proses ini.
Langkah berikutnya
Untuk mempelajari selengkapnya tentang Azure Event Hubs untuk Kafka, lihat artikel berikut ini: