Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Przechwytywanie zmian danych (CDC) to technika służąca do śledzenia zmian na poziomie wiersza w tabelach bazy danych w odpowiedzi na operacje tworzenia, aktualizowania i usuwania. Debezium to rozproszona platforma, która bazuje na funkcjach przechwytywania zmian danych dostępnych w różnych bazach danych (na przykład dekodowanie logiczne w usłudze PostgreSQL). Udostępnia zestaw łączników Kafka Connect, które odczytują zmiany na poziomie wiersza w tabelach bazy danych i konwertują je na strumienie zdarzeń, które są następnie wysyłane do Apache Kafka.
W tym samouczku przedstawiono sposób konfigurowania systemu przechwytywania danych zmian na platformie Azure przy użyciu usługi Event Hubs (dla platformy Kafka), usługi Azure Database for PostgreSQL i debezium. Używa łącznika Debezium PostgreSQL do streamowania modyfikacji bazy danych z PostgreSQL do tematów Kafki w Event Hubs.
Uwaga
Ten artykuł zawiera odwołania do terminu, którego firma Microsoft już nie używa. Po usunięciu terminu z oprogramowania usuniemy go z tego artykułu.
W tym przewodniku zrobisz następujące kroki:
- Tworzenie przestrzeni nazw usługi Event Hubs
- Ustaw i skonfiguruj Azure Database for PostgreSQL
- Konfigurowanie i uruchamianie programu Kafka Connect za pomocą łącznika Debezium PostgreSQL
- Testowanie przechwytywania danych zmian
- (Opcjonalnie) Korzystanie ze zdarzeń zmiany danych za pomocą łącznika
FileStreamSink
Wymagania wstępne
Aby ukończyć ten przewodnik, wymagane są następujące elementy:
- Subskrypcja platformy Azure. Jeśli jej nie masz, utwórz bezpłatne konto.
- Linux/macOS
- Platforma Kafka (wersja 1.1.1, Scala w wersji 2.11), dostępna na stronie kafka.apache.org
- Przeczytaj artykuł z wprowadzeniem Usługa Event Hubs dla platformy Apache Kafka
Tworzenie przestrzeni nazw usługi Event Hubs
Przestrzeń nazw Event Hubs jest wymagana do wysyłania i odbierania z każdej usługi Event Hubs. Aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń, zobacz Tworzenie centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Pobieranie parametrów połączenia usługi Event Hubs.
Tworzenie i konfigurowanie usługi Azure Database for PostgreSQL
Azure Database for PostgreSQL to usługa relacyjnej bazy danych oparta na społecznościowej wersji open-source aparatu bazy danych PostgreSQL i jest dostępna w trzech opcjach wdrożeniowych: pojedynczy serwer, serwer elastyczny oraz Cosmos DB for PostgreSQL. Postępuj zgodnie z tymi instrukcjami , aby utworzyć serwer usługi Azure Database for PostgreSQL przy użyciu witryny Azure Portal.
Konfigurowanie i uruchamianie programu Kafka Connect
W tej sekcji omówiono następujące tematy:
- Instalacja łącznika Debezium
- Konfigurowanie programu Kafka Connect dla usługi Event Hubs
- Uruchamianie klastra Kafka Connect za pomocą łącznika Debezium
Pobieranie i konfigurowanie łącznika debezium
Postępuj zgodnie z najnowszymi instrukcjami w dokumentacji narzędzia Debezium, aby pobrać i skonfigurować łącznik.
- Pobierz archiwum wtyczek łącznika. Aby na przykład pobrać wersję
1.2.0
łącznika, użyj tego linku — https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz - Wyodrębnij pliki JAR i skopiuj je do ścieżki plugin Kafka Connect.
Konfigurowanie narzędzia Kafka Connect dla usługi Event Hubs
Przekierowywanie przepływności narzędzia Kafka Connect z platformy Kafka do usługi Event Hubs wymaga minimalnej rekonfiguracji. W poniższym przykładowym pliku connect-distributed.properties
pokazano, jak skonfigurować narzędzie Connect do uwierzytelnienia i komunikowania się z punktem końcowym platformy Kafka w usłudze Event Hubs:
Ważne
- Debezium automatycznie tworzy wątek dla każdej tabeli oraz kilka wątków metadanych. Temat platformy Kafka odpowiada wystąpieniu usługi Event Hubs (centrum zdarzeń). Aby zapoznać się z mapowaniami platformy Apache Kafka do usługi Azure Event Hubs, zobacz Mapowanie koncepcyjne platformy Kafka i usługi Event Hubs.
- Istnieją różne limity liczby centrów zdarzeń w przestrzeni nazw usługi Event Hubs w zależności od warstwy (Podstawowa, Standardowa, Premium lub Dedykowana). Aby dowiedzieć się więcej o tych limitach, zobacz Przydziały.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Ważne
Zastąp {YOUR.EVENTHUBS.CONNECTION.STRING}
ciągiem połączenia dla przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące uzyskiwania parametru połączenia, zobacz Uzyskiwanie parametru połączenia Event Hubs. Oto przykładowa konfiguracja: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Uruchamianie narzędzia Kafka Connect
W tym kroku proces roboczy narzędzia Kafka Connect został uruchomiony lokalnie w trybie rozproszonym przy użyciu usługi Event Hubs w celu zachowania stanu klastra.
-
connect-distributed.properties
Zapisz plik lokalnie. Zamień wszystkie wartości w nawiasach klamrowych. - Przejdź do lokalizacji wydania Kafka na swoim komputerze.
- Uruchom
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
i poczekaj na uruchomienie klastra.
Uwaga
Platforma Kafka Connect używa interfejsu API AdminClient platformy Kafka do automatycznego tworzenia tematów z zalecanymi konfiguracjami, w tym kompaktowaniem. Po szybkim sprawdzeniu przestrzeni nazw w Portalu Azure okazuje się, że wewnętrzne tematy robocze Connect zostały utworzone automatycznie.
Tematy wewnętrzne w Kafka Connect muszą mieć włączone kompaktowanie. Zespół usługi Event Hubs nie jest odpowiedzialny za naprawianie nieprawidłowych konfiguracji, jeśli tematy programu Internal Connect są niepoprawnie skonfigurowane.
Konfigurowanie i uruchamianie łącznika Źródłowego Debezium PostgreSQL
Utwórz plik konfiguracji (pg-source-connector.json
) dla łącznika źródłowego postgreSQL — zastąp wartości zgodnie z wystąpieniem usługi Azure PostgreSQL.
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
Napiwek
database.server.name
atrybut jest nazwą logiczną, która identyfikuje i udostępnia przestrzeń nazw dla monitorowanego serwera/klastra bazy danych PostgreSQL.
Aby utworzyć instancję łącznika, użyj punktu końcowego REST API dla Kafka Connect.
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
Aby sprawdzić stan łącznika:
curl -s http://localhost:8083/connectors/todo-connector/status
Testowanie przechwytywania danych o zmianach
Aby zobaczyć przechwytywanie zmian danych w działaniu, musisz utworzyć/zaktualizować/usunąć rekordy w bazie danych Azure PostgreSQL.
Zacznij od nawiązania połączenia z bazą danych Azure PostgreSQL (w poniższym przykładzie użyto narzędzia psql).
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
Tworzenie tabeli i wstawianie rekordów
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
Łącznik powinien teraz rozpocząć działanie i wysyłać zdarzenia zmian danych do tematu usługi Event Hubs o następującej nazwie my-server.public.todos
, przy założeniu, że masz my-server
jako wartość database.server.name
i public.todos
jest tabelą, której zmiany są śledzone zgodnie z konfiguracją table.whitelist
.
Sprawdź wątek usługi Event Hubs
Przyjrzyjmy się zawartości tematu, aby upewnić się, że wszystko działa zgodnie z oczekiwaniami. W poniższym przykładzie użyto metody kafkacat
, ale można również utworzyć użytkownika przy użyciu dowolnej z opcji wymienionych tutaj.
Utwórz plik o nazwie kafkacat.conf
z następującą zawartością:
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
Uwaga
Aktualizuj atrybuty metadata.broker.list
oraz sasl.password
w kafkacat.conf
zgodnie z informacjami z Event Hubs.
W innym terminalu uruchom konsumenta:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
Powinny zostać wyświetlone ładunki danych JSON reprezentujące zdarzenia zmian danych wygenerowane w usłudze PostgreSQL w odpowiedzi na wiersze, które dodałeś do tabeli todos
. Oto fragment kodu ładunku:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
Zdarzenie składa się z elementu payload
wraz z jego schema
(pominięte w celu zwięzłości). W payload
sekcji zwróć uwagę, jak jest reprezentowana operacja tworzenia ("op": "c"
) — "before": null
oznacza, że był to nowo INSERT
utworzony wiersz, after
zawiera wartości dla kolumn w wierszu, source
dostarcza metadane wystąpienia postgreSQL z miejsca, w którym to zdarzenie zostało odebrane itd.
Możesz również wypróbować te same operacje aktualizacji lub usuwania oraz introspektować zdarzenia zmian danych. Na przykład, aby zaktualizować stan zadania dla configure and install connector
(zakładając, że jego id
to 3
):
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
(Opcjonalnie) Instalowanie łącznika FileStreamSink
Teraz, gdy wszystkie zmiany tabeli todos
są przechwytywane w temacie w usłudze Event Hubs, możesz użyć łącznika FileStreamSink (dostępnego domyślnie w programie Kafka Connect) do przetwarzania tych zdarzeń.
Utwórz plik konfiguracji (file-sink-connector.json
) dla łącznika — zastąp atrybut file
zgodnie z systemem plików.
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
Aby utworzyć łącznik i sprawdzić jego stan:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
Wstaw/zaktualizuj/usuń rekordy bazy danych i monitoruj rekordy w skonfigurowanym pliku ujścia danych wyjściowych:
tail -f /Users/foo/todos-cdc.txt
Czyszczenie
Kafka Connect tworzy tematy Event Hubs w celu przechowywania konfiguracji, przesunięć i stanu, które są zachowywane nawet po wyłączeniu klastra Kafka Connect. Jeśli ta trwałość nie jest wymagana, zalecamy usunięcie tych tematów. Możesz również usunąć my-server.public.todos
centrum zdarzeń, które zostało utworzone podczas tego przewodnika.
Następne kroki
Aby dowiedzieć się więcej o usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły:
- Replikacja brokera Kafka w Centrum wydarzeń
- Łączenie platformy Apache Spark z centrum zdarzeń
- Łączenie narzędzia Apache Flink z centrum zdarzeń
- Eksploruj przykłady na naszym GitHubie
- Łączenie usługi Akka Streams z centrum zdarzeń
- Przewodnik dla deweloperów platformy Apache Kafka dotyczący usługi Azure Event Hubs