Udostępnij za pośrednictwem


Integracja obsługi programu Apache Kafka Connect w usłudze Azure Event Hubs z rozwiązaniem Debezium na potrzeby przechwytywania zmian danych

Przechwytywanie zmian danych (CDC) to technika służąca do śledzenia zmian na poziomie wiersza w tabelach bazy danych w odpowiedzi na operacje tworzenia, aktualizowania i usuwania. Debezium to rozproszona platforma, która bazuje na funkcjach przechwytywania zmian danych dostępnych w różnych bazach danych (na przykład dekodowanie logiczne w usłudze PostgreSQL). Udostępnia zestaw łączników Kafka Connect, które odczytują zmiany na poziomie wiersza w tabelach bazy danych i konwertują je na strumienie zdarzeń, które są następnie wysyłane do Apache Kafka.

W tym samouczku przedstawiono sposób konfigurowania systemu przechwytywania danych zmian na platformie Azure przy użyciu usługi Event Hubs (dla platformy Kafka), usługi Azure Database for PostgreSQL i debezium. Używa łącznika Debezium PostgreSQL do streamowania modyfikacji bazy danych z PostgreSQL do tematów Kafki w Event Hubs.

Uwaga

Ten artykuł zawiera odwołania do terminu, którego firma Microsoft już nie używa. Po usunięciu terminu z oprogramowania usuniemy go z tego artykułu.

W tym przewodniku zrobisz następujące kroki:

  • Tworzenie przestrzeni nazw usługi Event Hubs
  • Ustaw i skonfiguruj Azure Database for PostgreSQL
  • Konfigurowanie i uruchamianie programu Kafka Connect za pomocą łącznika Debezium PostgreSQL
  • Testowanie przechwytywania danych zmian
  • (Opcjonalnie) Korzystanie ze zdarzeń zmiany danych za pomocą łącznika FileStreamSink

Wymagania wstępne

Aby ukończyć ten przewodnik, wymagane są następujące elementy:

Tworzenie przestrzeni nazw usługi Event Hubs

Przestrzeń nazw Event Hubs jest wymagana do wysyłania i odbierania z każdej usługi Event Hubs. Aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń, zobacz Tworzenie centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Pobieranie parametrów połączenia usługi Event Hubs.

Tworzenie i konfigurowanie usługi Azure Database for PostgreSQL

Azure Database for PostgreSQL to usługa relacyjnej bazy danych oparta na społecznościowej wersji open-source aparatu bazy danych PostgreSQL i jest dostępna w trzech opcjach wdrożeniowych: pojedynczy serwer, serwer elastyczny oraz Cosmos DB for PostgreSQL. Postępuj zgodnie z tymi instrukcjami , aby utworzyć serwer usługi Azure Database for PostgreSQL przy użyciu witryny Azure Portal.

Konfigurowanie i uruchamianie programu Kafka Connect

W tej sekcji omówiono następujące tematy:

  • Instalacja łącznika Debezium
  • Konfigurowanie programu Kafka Connect dla usługi Event Hubs
  • Uruchamianie klastra Kafka Connect za pomocą łącznika Debezium

Pobieranie i konfigurowanie łącznika debezium

Postępuj zgodnie z najnowszymi instrukcjami w dokumentacji narzędzia Debezium, aby pobrać i skonfigurować łącznik.

Konfigurowanie narzędzia Kafka Connect dla usługi Event Hubs

Przekierowywanie przepływności narzędzia Kafka Connect z platformy Kafka do usługi Event Hubs wymaga minimalnej rekonfiguracji. W poniższym przykładowym pliku connect-distributed.properties pokazano, jak skonfigurować narzędzie Connect do uwierzytelnienia i komunikowania się z punktem końcowym platformy Kafka w usłudze Event Hubs:

Ważne

  • Debezium automatycznie tworzy wątek dla każdej tabeli oraz kilka wątków metadanych. Temat platformy Kafka odpowiada wystąpieniu usługi Event Hubs (centrum zdarzeń). Aby zapoznać się z mapowaniami platformy Apache Kafka do usługi Azure Event Hubs, zobacz Mapowanie koncepcyjne platformy Kafka i usługi Event Hubs.
  • Istnieją różne limity liczby centrów zdarzeń w przestrzeni nazw usługi Event Hubs w zależności od warstwy (Podstawowa, Standardowa, Premium lub Dedykowana). Aby dowiedzieć się więcej o tych limitach, zobacz Przydziały.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Ważne

Zastąp {YOUR.EVENTHUBS.CONNECTION.STRING} ciągiem połączenia dla przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące uzyskiwania parametru połączenia, zobacz Uzyskiwanie parametru połączenia Event Hubs. Oto przykładowa konfiguracja: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Uruchamianie narzędzia Kafka Connect

W tym kroku proces roboczy narzędzia Kafka Connect został uruchomiony lokalnie w trybie rozproszonym przy użyciu usługi Event Hubs w celu zachowania stanu klastra.

  1. connect-distributed.properties Zapisz plik lokalnie. Zamień wszystkie wartości w nawiasach klamrowych.
  2. Przejdź do lokalizacji wydania Kafka na swoim komputerze.
  3. Uruchom ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties i poczekaj na uruchomienie klastra.

Uwaga

Platforma Kafka Connect używa interfejsu API AdminClient platformy Kafka do automatycznego tworzenia tematów z zalecanymi konfiguracjami, w tym kompaktowaniem. Po szybkim sprawdzeniu przestrzeni nazw w Portalu Azure okazuje się, że wewnętrzne tematy robocze Connect zostały utworzone automatycznie.

Tematy wewnętrzne w Kafka Connect muszą mieć włączone kompaktowanie. Zespół usługi Event Hubs nie jest odpowiedzialny za naprawianie nieprawidłowych konfiguracji, jeśli tematy programu Internal Connect są niepoprawnie skonfigurowane.

Konfigurowanie i uruchamianie łącznika Źródłowego Debezium PostgreSQL

Utwórz plik konfiguracji (pg-source-connector.json) dla łącznika źródłowego postgreSQL — zastąp wartości zgodnie z wystąpieniem usługi Azure PostgreSQL.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Napiwek

database.server.name atrybut jest nazwą logiczną, która identyfikuje i udostępnia przestrzeń nazw dla monitorowanego serwera/klastra bazy danych PostgreSQL.

Aby utworzyć instancję łącznika, użyj punktu końcowego REST API dla Kafka Connect.

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

Aby sprawdzić stan łącznika:

curl -s http://localhost:8083/connectors/todo-connector/status

Testowanie przechwytywania danych o zmianach

Aby zobaczyć przechwytywanie zmian danych w działaniu, musisz utworzyć/zaktualizować/usunąć rekordy w bazie danych Azure PostgreSQL.

Zacznij od nawiązania połączenia z bazą danych Azure PostgreSQL (w poniższym przykładzie użyto narzędzia psql).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Tworzenie tabeli i wstawianie rekordów

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

Łącznik powinien teraz rozpocząć działanie i wysyłać zdarzenia zmian danych do tematu usługi Event Hubs o następującej nazwie my-server.public.todos, przy założeniu, że masz my-server jako wartość database.server.name i public.todos jest tabelą, której zmiany są śledzone zgodnie z konfiguracją table.whitelist.

Sprawdź wątek usługi Event Hubs

Przyjrzyjmy się zawartości tematu, aby upewnić się, że wszystko działa zgodnie z oczekiwaniami. W poniższym przykładzie użyto metody kafkacat, ale można również utworzyć użytkownika przy użyciu dowolnej z opcji wymienionych tutaj.

Utwórz plik o nazwie kafkacat.conf z następującą zawartością:

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Uwaga

Aktualizuj atrybuty metadata.broker.list oraz sasl.password w kafkacat.conf zgodnie z informacjami z Event Hubs.

W innym terminalu uruchom konsumenta:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

Powinny zostać wyświetlone ładunki danych JSON reprezentujące zdarzenia zmian danych wygenerowane w usłudze PostgreSQL w odpowiedzi na wiersze, które dodałeś do tabeli todos. Oto fragment kodu ładunku:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fulfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

Zdarzenie składa się z elementu payload wraz z jego schema (pominięte w celu zwięzłości). W payload sekcji zwróć uwagę, jak jest reprezentowana operacja tworzenia ("op": "c") — "before": null oznacza, że był to nowo INSERTutworzony wiersz, after zawiera wartości dla kolumn w wierszu, source dostarcza metadane wystąpienia postgreSQL z miejsca, w którym to zdarzenie zostało odebrane itd.

Możesz również wypróbować te same operacje aktualizacji lub usuwania oraz introspektować zdarzenia zmian danych. Na przykład, aby zaktualizować stan zadania dla configure and install connector (zakładając, że jego id to 3):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(Opcjonalnie) Instalowanie łącznika FileStreamSink

Teraz, gdy wszystkie zmiany tabeli todos są przechwytywane w temacie w usłudze Event Hubs, możesz użyć łącznika FileStreamSink (dostępnego domyślnie w programie Kafka Connect) do przetwarzania tych zdarzeń.

Utwórz plik konfiguracji (file-sink-connector.json) dla łącznika — zastąp atrybut file zgodnie z systemem plików.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

Aby utworzyć łącznik i sprawdzić jego stan:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Wstaw/zaktualizuj/usuń rekordy bazy danych i monitoruj rekordy w skonfigurowanym pliku ujścia danych wyjściowych:

tail -f /Users/foo/todos-cdc.txt

Czyszczenie

Kafka Connect tworzy tematy Event Hubs w celu przechowywania konfiguracji, przesunięć i stanu, które są zachowywane nawet po wyłączeniu klastra Kafka Connect. Jeśli ta trwałość nie jest wymagana, zalecamy usunięcie tych tematów. Możesz również usunąć my-server.public.todos centrum zdarzeń, które zostało utworzone podczas tego przewodnika.

Następne kroki

Aby dowiedzieć się więcej o usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: