Поделиться через


Интеграция Apache Kafka Connect в Центрах событий Azure с Debezium для отслеживания измененных данных

Система отслеживания измененных данных (CDC) — это метод, используемый для отслеживания изменений на уровне строк в таблицах базы данных в ответ на операции создания, обновления и удаления. Debezium — это распределенная платформа, построенная на основе функций системы отслеживания измененных данных, доступных в разных СУБД (например, логическое декодирование в PostgreSQL). Он предоставляет набор соединителей Kafka Connect, которые касаются изменений на уровне строк в таблицах баз данных и преобразуют их в потоки событий, которые затем отправляются в Apache Kafka.

В этом руководстве описано, как настроить систему отслеживания измененных данных в Azure с помощью Центров событий (для Kafka), База данных Azure для PostgreSQL и Debezium. Он использует коннектор Debezium PostgreSQL для потоковой передачи изменений базы данных из PostgreSQL в топики Kafka в Центрах событий.

Примечание.

В этой статье содержатся ссылки на термин, который корпорация Майкрософт больше не использует. Когда этот термин будет удален из программного обеспечения, мы удалим его из статьи.

При работе с этим руководством вы выполните следующие задачи:

  • Создайте пространство имен в Центрах обработки событий Azure
  • Установка и настройка Базы данных Azure для PostgreSQL
  • Настройка и запуск Kafka Connect с соединителем Debezium PostgreSQL
  • Тестирование захвата изменений данных
  • Необязательно: потребление событий изменения данных с помощью соединителя FileStreamSink

Предварительные требования

Чтобы выполнить это пошаговое руководство, вам потребуется:

Создание пространства имен в Центрах событий Azure

Для отправки и получения данных из любой службы Центров событий требуется пространство имен Центров событий. См. раздел Создание концентратора событий для получения инструкций по созданию пространства имен и концентратора событий. Получите строку подключения Event Hubs и полностью квалифицированное доменное имя (FQDN) для последующего использования. Инструкции см. в статье Get an Event Hubs connection string (Получение строки подключения для Центров событий).

Установка и настройка Базы данных Azure для PostgreSQL

База данных Azure для PostgreSQL — это служба реляционной базы данных на основе версии ядра СУБД PostgreSQL с открытым исходным кодом и доступна в трех вариантах развертывания: отдельный сервер, гибкий сервер и Cosmos DB для PostgreSQL. Воспользуйтесь этими инструкциями, чтобы создать сервер службы "База данных Azure для PostgreSQL" с помощью портала Azure.

Настройка и запуск Kafka Connect

В этом разделе описываются следующие темы:

  • Установка соединителя Debezium
  • Настройка Kafka Connect для Центров событий
  • Запуск кластера Kafka Connect с помощью соединителя Debezium

Скачайте и настройте соединитель Debezium.

Чтобы скачать и настроить соединитель, следуйте актуальным инструкциям из документации по Debezium.

Настройка Kafka Connect для Центров событий

Минимальная перенастройка необходима при перенаправлении пропускной способности Kafka Connect из Kafka в Центры событий. Следующий пример connect-distributed.properties показывает, как настроить Connect для проверки подлинности и обмена данных с конечной точкой Kafka в Центрах событий:

Внимание

  • Debezium автоматически создает раздел для каждой таблицы и кучу разделов метаданных. Раздел Kafka соответствует экземпляру Центров событий (концентратор событий). Для сопоставления Apache Kafka с Центрами событий Azure см. Концептуальное сопоставление Kafka и Центров событий.
  • Количество концентраторов событий в пространстве имен "Центры событий" зависит от уровня ("Базовый", "Стандартный", "Премиум" или "Выделенный"). Эти ограничения см. в разделе Квоты.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Внимание

Замените {YOUR.EVENTHUBS.CONNECTION.STRING} строкой подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Запуск Kafka Connect

На этом этапе исполнитель Kafka Connect запускается локально в распределённом режиме с помощью Event Hubs для поддержания состояния кластера.

  1. Сохраните connect-distributed.properties файл локально. Не забудьте заменить все значения в фигурных скобках.
  2. Перейдите к расположению релиза Kafka на своем компьютере.
  3. Запустите ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties и дождитесь запуска кластера.

Примечание.

Kafka Connect использует API Kafka AdminClient для автоматического создания разделов с рекомендуемыми конфигурациями, включая сжатие. Быстрая проверка пространства имен на портале Azure показывает, что внутренние разделы рабочей роли Connect были созданы автоматически.

Внутренние темы Kafka Connect должны использовать сжатие. Команда Центров событий не несет ответственности за исправление неправильных конфигураций, если внутренние разделы Connect настроены неправильно.

Настройка и запуск соединителя источника Debezium PostgreSQL

Создайте файл конфигурации (pg-source-connector.json) для соединителя источника PostgreSQL. Замените значения в нем в соответствии с параметрами своего экземпляра Azure PostgreSQL.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Совет

database.server.name атрибут — это логическое имя, определяющее и предоставляющее пространство имен для конкретного отслеживаемого сервера базы данных PostgreSQL или кластера.

Чтобы создать экземпляр соединителя, используйте конечную точку REST API Kafka Connect.

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

Чтобы проверить состояние соединителя:

curl -s http://localhost:8083/connectors/todo-connector/status

Тестирование фиксации изменений данных

Чтобы просмотреть запись измененных данных в действии, необходимо создать или обновить или удалить записи в базе данных Azure PostgreSQL.

Начните с подключения к базе данных Azure PostgreSQL (в следующем примере используется psql).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Создание таблицы и вставка записей

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

Теперь соединитель должен начать работать и отправлять события изменений в топик Event Hubs со следующим именем my-server.public.todos, при условии, что у вас есть my-server в качестве значения для database.server.name, а public.todos является таблицей, изменения которой вы отслеживаете (в соответствии с конфигурацией table.whitelist).

Проверка темы Event Hubs

Проверим содержимое раздела, чтобы убедиться, что все работает как надо. В следующем примере используется kafkacat, но можно также создать потребителя с помощью любого из перечисленных здесь параметров.

Создайте файл kafkacat.conf со следующим содержимым:

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Примечание.

Отредактируйте атрибуты metadata.broker.list и sasl.password в kafkacat.conf согласно параметрам Центров событий.

В другом терминале запустите потребитель:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

Вы должны увидеть полезные данные JSON, представляющие события изменений данных, которые создаются в PostgreSQL в ответ на строки, которые вы добавили в таблицу todos. Вот фрагмент полезных данных:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fulfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

Событие состоит из payload вместе с соответствующей schema (для краткости опущено). В разделе payload обратите внимание на то, как представлена операция создания ("op": "c"): "before": null означает, что была вставлена (INSERT) новая строка, after предоставляет значения столбцов в строке, source — метаданные экземпляра PostgreSQL, откуда было получено это событие, и т. д.

Вы можете проделать то же самое с операциями обновления и удаления и проанализировать события изменения данных. Например, чтобы обновить состояние задачи для configure and install connector (при условии, что id имеет значение 3):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

Необязательно: установка соединителя FileStreamSink

Теперь, когда все todos изменения таблицы записываются в тему в Event Hubs, вы используете соединитель FileStreamSink (который по умолчанию доступен в Kafka Connect) для обработки этих событий.

Создайте файл конфигурации (file-sink-connector.json) для соединителя — замените атрибут file в соответствии с вашей файловой системой.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

Чтобы создать соединитель и проверить его состояние:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Вставьте, обновите или удалите записи в базе данных и проверьте содержимое заданного вами файла приемника вывода:

tail -f /Users/foo/todos-cdc.txt

Очистка

Kafka Connect создает топики Event Hubs для хранения конфигураций, сдвигов и состояния, которые сохраняются даже после остановки кластера Kafka Connect. Если эта сохраняемость не требуется, рекомендуется удалить эти разделы. Кроме того, может потребоваться удалить my-server.public.todos концентратор событий, созданный во время этого пошагового действия.

Следующие шаги

Дополнительные сведения о концентраторах событий для Kafka см. в следующих статьях: