Compartilhar via


Integrar o suporte do Apache Kafka Connect nos Hubs de Eventos do Azure com o Debezium para Captura de dados de alterações

O CDC (Captura de Dados de Alterações) é uma técnica usada para acompanhar alterações no nível da linha em tabelas de banco de dados em resposta às operações de criação, atualização e exclusão. O Debezium é uma plataforma distribuída que se baseia nos recursos do Captura de Dados de Alterações disponíveis em bancos de dados diferentes (por exemplo, decodificação lógica no PostgreSQL). Ele fornece um conjunto de conectores do Kafka Connect que toca em alterações no nível de linha em tabelas de banco de dados e as converte em fluxos de eventos que são enviados para o Apache Kafka.

Este tutorial é um passo a passo de como configurar um sistema baseado em captura de dados de alterações no Azure usando os Hubs de Eventos (para Kafka), o Banco de Dados do Azure para PostgreSQL e a Debezium. Ele usa o conector PostgreSQL da Debezium para transmitir modificações do banco de dados a partir do PostgreSQL para os tópicos do Kafka nos Hubs de Eventos.

Observação

Este artigo contém referências a um termo que a Microsoft não usa mais. Quando o termo for removido do software, também o removeremos deste artigo.

Neste tutorial, você deve executar as seguintes etapas:

  • Criar um namespace dos Hubs de Eventos
  • Definir e configurar o Banco de Dados do Azure para PostgreSQL
  • Configurar e executar o Kafka Connect com o conector Debezium do PostgreSQL
  • Testar a captura de dados de alterações
  • (Opcional) Consumir eventos de dados de alteração com um conector FileStreamSink

Pré-requisitos

Para concluir este passo a passo, você exige:

Criar um namespace dos Hubs de Eventos

É necessário um namespace do Hubs de Eventos para enviar e receber de qualquer serviço de Hub de Eventos. Para obter instruções sobre como criar um namespace e um hub de eventos, confira Criar um hub de eventos. Obtenha a cadeia de conexão dos Hubs de Eventos e o FQDN (nome de domínio totalmente qualificado) para uso posterior. Para obter instruções, confira Obter uma cadeia de conexão dos Hubs de Eventos.

Configurar e configurar Banco de Dados do Azure para PostgreSQL

O Banco de Dados do Azure para PostgreSQL é um serviço de banco de dados relacional baseado na versão comunitária do mecanismo de banco de dados do PostgreSQL de código aberto e está disponível em três opções de implantação: Servidor Único, Servidor Flexível e Cosmos DB para PostgreSQL. Siga estas instruções para criar um servidor do Banco de Dados do Azure para PostgreSQL usando o portal do Azure.

Definir e executar o Kafka Connect

Esta seção contém os seguintes tópicos:

  • Instalação do conector Debezium
  • Configuração do Kafka Connect para os Hubs de Eventos
  • Iniciar o cluster do Kafka Connect com o conector do Debezium

Baixar e configurar o conector do Debezium

Siga as instruções mais recentes na documentação do Debezium para baixar e configurar o conector.

Configurar o Kafka Connect para os Hubs de Eventos

É necessário um mínimo de reconfiguração para redirecionar a taxa de transferência do Kafka Connect para os Hubs de Eventos. O exemplo connect-distributed.properties abaixo ilustra como configurar o Connect para fazer a autenticação e se comunicar com o ponto de extremidade do Kafka nos Hubs de Eventos:

Importante

  • Debezium criará automaticamente um tópico por tabela e vários tópicos de metadados. O tópico kafka corresponde a uma instância dos Hubs de Eventos (hub de eventos). Para mapeamentos do Apache Kafka para Hubs de Eventos do Azure, confira Mapeamento conceitual de Kafka e dos Hubs de Eventos.
  • Há diferentes limites no número de hubs de eventos em um namespace dos Hubs de Eventos, dependendo da camada (Básico, Standard, Premium ou Dedicado). Para esses limites, consulte Cotas.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Importante

Substitua {YOUR.EVENTHUBS.CONNECTION.STRING} pela cadeia de conexão do seu namespace dos Hubs de Eventos. Para ver as instruções sobre como obter uma cadeia de conexão, confira Obter cadeia de conexão para Hubs de Eventos. Aqui está um exemplo de configuração: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Executar o Kafka Connect

Nesta etapa, um trabalho do Kafka Connect é iniciado localmente em modo distribuído, usando os Hubs de Eventos para manter o estado do cluster.

  1. Salve o arquivo acima connect-distributed.properties localmente. Substitua todos os valores entre chaves.
  2. Navegue até o local da versão do Kafka em seu computador.
  3. Execute ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties e aguarde o cluster iniciar.

Observação

O Kafka Connect usa a API AdminClient do Kafka para criar automaticamente tópicos com configurações recomendadas, incluindo a compactação. Uma verificação rápida do namespace no portal do Azure revela que os tópicos internos do trabalho do Connect foram criados automaticamente.

Os tópicos internos do Kafka Connect precisam usar a compactação. A equipe dos Hubs de Eventos não será responsável por corrigir configurações inadequadas se os tópicos internos do Connect estiverem configurados incorretamente.

Configurar e iniciar o conector de origem Debezium do PostgreSQL

Criar um arquivo de configuração (pg-source-connector.json) para o conector de origem do PostgreSQL – substitua os valores de acordo com a instância do PostgreSQL do Azure.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Dica

O atributo database.server.name é um nome lógico que identifica e fornece um namespace para o servidor/cluster de banco de dados do PostgreSQL que está sendo monitorado.

Para criar uma instância do conector, use o ponto de extremidade da API REST do Kafka Connect:

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

Para verificar o status do conector:

curl -s http://localhost:8083/connectors/todo-connector/status

Testar a captura de dados de alterações

Para ver a captura de dados de alterações em ação, você precisa criar/atualizar/excluir registros no banco de dados PostgreSQL do Azure.

Inicie conectando-se ao banco de dados do PostgreSQL para Azure (o exemplo a seguir usa psql).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Criar uma tabela e inserir registros

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

Agora o conector deve entrar em ação e enviar eventos de alteração de dados para um tópico de Hubs de Eventos com o seguinte nome, my-server.public.todos, considerando que você tenha my-server como o valor para database.server.name e public.todos é a tabela cujas alterações você está acompanhando (de acordo com a configuração table.whitelist).

Tópico Verificar Hubs de Eventos

Vamos analisar o conteúdo do tópico para garantir que tudo está funcionando conforme o esperado. O exemplo abaixo usa kafkacat, mas você também pode criar um consumidor usando qualquer uma das opções listadas aqui.

Crie um arquivo chamado kafkacat.conf com os conteúdos a seguir:

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Observação

Atualizar os atributos metadata.broker.list e sasl.password em kafkacat.conf de acordo com as informações dos Hubs de Eventos.

Em um terminal diferente, inicie um consumidor:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

Você deve ver os payloads JSON que representam os eventos de dados de alteração gerados no PostgreSQL em resposta às linhas que você adicionou à tabela todos. Aqui está um snippet do payload:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fulfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

O evento consiste no payload junto ao seu schema (omitido para brevidade). Na seção payload, observe como a operação de criação ("op": "c") é representada - "before": null significa que ela era uma INSERTlinha recém-criada, after fornece valores para as colunas na linha, source fornece os metadados da instância do PostgreSQL de onde esse evento foi retirado etc.

Você também pode tentar o mesmo com operações de atualização ou exclusão e analisar os eventos de dados de alteração. Por exemplo, para atualizar o status da tarefa para configure and install connector (supondo que id seja 3):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(Opcional) Instalar o conector FileStreamSink

Agora que todas as alterações da tabela todos estão sendo capturadas no tópico Hubs de Eventos, vamos usar o conector FileStreamSink (que está disponível por padrão no Kafka Connect) para consumir esses eventos.

Criar um arquivo de configuração (file-sink-connector.json) para o conector - substitua o atributo file de acordo com o sistema de arquivos.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

Para criar o conector e verificar seu status:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Inserir/atualizar/excluir registros de banco de dados e monitorar os registros no arquivo de coletor de saída configurado:

tail -f /Users/foo/todos-cdc.txt

Limpeza

O Kafka Connect cria tópicos de Hubs de Eventos para armazenar status, deslocamentos e configurações que persistem mesmo depois que o cluster do Kafka Connect é desativado. A menos que essa persistência seja desejada, recomendamos que você exclua esses tópicos. Talvez seja ideal também excluir o hub de eventos my-server.public.todos que foi criado no decorrer deste passo a passo.

Próximas etapas

Para saber mais sobre os Hubs de Eventos para o Kafka, confira os artigos a seguir: