Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
A Captura de Dados de Alteração (CDC) é uma técnica usada para rastrear alterações no nível da linha em tabelas de banco de dados em resposta a operações de criação, atualização e exclusão. Debezium é uma plataforma distribuída que se baseia em recursos de Change Data Capture disponíveis em diferentes bancos de dados (por exemplo, decodificação lógica em PostgreSQL). Ele fornece um conjunto de conectores Kafka Connect que exploram alterações ao nível das linhas nas tabelas de bases de dados e as convertem em fluxos de eventos que são então enviados para o Apache Kafka.
Este tutorial orienta-o sobre como configurar um sistema baseado em captura de dados de alteração no Azure usando Hubs de Eventos (para o Kafka), Banco de Dados do Azure para PostgreSQL e Debezium. Ele usa o Debezium PostgreSQL connector para transmitir modificações de base de dados do PostgreSQL para tópicos Kafka no Event Hubs.
Nota
Este artigo contém referências a um termo que a Microsoft já não utiliza. Quando o termo for removido do software, iremos removê-lo deste artigo.
Neste tutorial, siga os seguintes passos:
- Criar um namespace de Hubs de Eventos
- Configurar o Serviço de Base de Dados do Azure para PostgreSQL
- Configurar e executar o Kafka Connect com o conector Debezium PostgreSQL
- Teste de captura de alterações de dados
- (Opcional) Consumir eventos de alteração de dados com o conector
FileStreamSink
Pré-requisitos
Para concluir este passo a passo, você precisa:
- Subscrição do Azure. Se não tiver uma subscrição, crie uma conta gratuita.
- Linux/macOS
- Versão do Kafka (versão 1.1.1, Scala versão 2.11), disponível em kafka.apache.org.
- Ler o artigo de introdução dos Event Hubs for Apache Kafka (Hubs de Eventos para Apache Kafka).
Criar um namespace do Hubs de Eventos
É necessário um espaço de nomes dos Hubs de Eventos para enviar e receber em qualquer serviço dos Hubs de Eventos. Consulte Criando um hub de eventos para obter instruções sobre como criar um namespace e um hub de eventos. Obtenha a cadeia de ligação dos Hubs de Eventos e o nome de domínio completamente qualificado (FQDN), para utilizar mais tarde. Para obter instruções, veja Get an Event Hubs connection string (Obter uma cadeia de ligação dos Hubs de Eventos).
Instalar e configurar o Banco de Dados do Azure para PostgreSQL
O Banco de Dados do Azure para PostgreSQL é um serviço de banco de dados relacional baseado na versão da comunidade do mecanismo de banco de dados PostgreSQL de código aberto e está disponível em três opções de implantação: Servidor Único, Servidor Flexível e Cosmos DB para PostgreSQL. Siga estas instruções para criar um Banco de Dados do Azure para o servidor PostgreSQL usando o portal do Azure.
Configurar e executar o Kafka Connect
Esta secção abrange os seguintes tópicos:
- Instalação do conector Debezium
- Configurando o Kafka Connect para Hubs de Eventos
- Inicie o cluster Kafka Connect com o conector Debezium
Download e configuração do conector Debezium
Siga as instruções mais recentes na documentação do Debezium para baixar e configurar o conector.
- Faça o download do ficheiro de plug-in do conector. Por exemplo, para baixar a versão
1.2.0
do conector, use este link - https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz - Extraia os arquivos JAR e copie-os para o plugin Kafka Connect.path.
Configurar o Kafka Connect para os Hubs de Eventos
A reconfiguração mínima é necessária ao redirecionar a largura de banda do Kafka Connect do Kafka para os Hubs de Eventos. O exemplo seguinte, connect-distributed.properties
, ilustra como configurar o Connect para se autenticar e comunicar com o ponto final do Kafka nos Hubs de Eventos:
Importante
- Debezium cria automaticamente um tópico por tabela e um monte de tópicos de metadados. O tópico Kafka corresponde a uma instância de Hubs de Eventos (hub de eventos). Para mapeamentos do Apache Kafka para Hubs de Eventos do Azure, consulte Mapeamento conceitual de Kafka e Hubs de Eventos.
- Há diferentes limites para o número de hubs de eventos em um namespace de Hubs de Eventos, dependendo da camada (Básico, Padrão, Premium ou Dedicado). Para estes limites, consulte Cotas.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Importante
Substitua {YOUR.EVENTHUBS.CONNECTION.STRING}
pela cadeia de conexão do namespace do Event Hubs. Para obter instruções sobre como obter a cadeia de conexão, consulte Obter uma cadeia de conexão de Hubs de Eventos. Aqui está um exemplo de configuração: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Executar o Kafka Connect
Neste passo, é iniciada uma função de trabalho do Kafka Connect localmente no modo distribuído e são utilizados os Hubs de Eventos para manter o estado do cluster.
- Salve o
connect-distributed.properties
arquivo localmente. Certifique-se de substituir todos os valores dentro das chavetas. - Navegue para a localização da versão do Kafka no computador.
- Execute
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
e aguarde até que o cluster seja iniciado.
Nota
O Kafka Connect usa a API Kafka AdminClient para criar automaticamente tópicos com configurações recomendadas, incluindo compactação. Uma verificação rápida do espaço de nomes no portal do Azure revela que os tópicos internos do trabalhador do Connect foram criados automaticamente.
Os tópicos internos do Kafka Connect devem usar compactação. A equipe de Hubs de Eventos não é responsável por corrigir configurações inadequadas se os tópicos internos do Connect estiverem configurados incorretamente.
Configurar e iniciar o conector de origem Debezium PostgreSQL
Crie um arquivo de configuração (pg-source-connector.json
) para o conector de origem do PostgreSQL - substitua os valores de acordo com sua instância do Azure PostgreSQL.
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
Gorjeta
database.server.name
atributo é um nome lógico que identifica e fornece um namespace para o servidor/cluster de banco de dados PostgreSQL específico que está sendo monitorado.
Para criar uma instância do conector, use o ponto de extremidade da API REST do Kafka Connect:
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
Para verificar o status do conector:
curl -s http://localhost:8083/connectors/todo-connector/status
Captura de alterações de dados de teste
Para ver a captura de dados de alteração em ação, você precisa criar/atualizar/excluir registros no banco de dados PostgreSQL do Azure.
Comece conectando-se ao seu banco de dados PostgreSQL do Azure (o exemplo a seguir usa psql).
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
Criar uma tabela e inserir registos
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
O conector deve agora entrar em ação e enviar eventos de dados de alteração para um tópico do Hubs de Eventos com o seguinte nome my-server.public.todos
, assumindo que tenha my-server
como valor para database.server.name
e que public.todos
seja a tabela cujas alterações está a acompanhar, conforme a configuração de table.whitelist
.
Verifique o tópico Hubs de Eventos
Vamos introspeccionar o conteúdo do tópico para garantir que tudo esteja funcionando conforme o esperado. O exemplo a seguir usa kafkacat
, mas você também pode criar um consumidor usando qualquer uma das opções listadas aqui.
Crie um arquivo nomeado kafkacat.conf
com o seguinte conteúdo:
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
Nota
Atualize os atributos metadata.broker.list
e sasl.password
em kafkacat.conf
de acordo com as informações dos Hubs de Eventos.
Num terminal diferente, inicie um consumidor:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
Você deve ver as cargas JSON que representam os eventos de dados de alteração gerados no PostgreSQL em resposta às linhas adicionadas à todos
tabela. Aqui está um trecho da carga útil:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
O evento consiste em payload
juntamente com o seu schema
(omitido por brevidade). Na payload
seção, observe como a operação de criação ("op": "c"
) é representada - "before": null
significa que era uma linha recém-criada, after
fornece valores para as colunas na linha, source
fornece os metadados da instância do PostgreSQL de onde esse evento foi coletado e assim por diante.
Você também pode tentar o mesmo com operações de atualização ou exclusão e introspeccionar os eventos de alteração de dados. Por exemplo, para atualizar o estado da tarefa para configure and install connector
(supondo que seja id
3
):
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
(Opcional) Instalar o conector FileStreamSink
Agora que todas as alterações da todos
tabela estão a ser capturadas no tópico do Event Hubs, utilize o conector FileStreamSink (que está disponível por padrão no Kafka Connect) para consumir esses eventos.
Crie um arquivo de configuração (file-sink-connector.json
) para o conector - substitua o atributo de acordo com seu file
sistema de arquivos.
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
Para criar o conector e verificar seu status:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
Inserir/atualizar/excluir registros de banco de dados e monitorar os registros no arquivo de coletor de saída configurado:
tail -f /Users/foo/todos-cdc.txt
Limpeza
O Kafka Connect cria tópicos de Hubs de Eventos para armazenar configurações, deslocamentos e status que persistem mesmo após o cluster do Kafka Connect ter sido removido. A menos que essa persistência seja desejada, recomendamos que você exclua esses tópicos. Você também pode querer excluir o hub de my-server.public.todos
eventos que foram criados durante este passo a passo.
Próximos passos
Para saber mais sobre os Hubs de Eventos para Kafka, consulte os seguintes artigos:
- Espelhar um servidor Kafka em um hub de eventos
- Conectar o Apache Spark a um hub de eventos
- Conectar o Apache Flink a um hub de eventos
- Explore samples on our GitHub (Explorar exemplos no nosso GitHub)
- Conecte o Akka Streams a um hub de eventos
- Guia do desenvolvedor do Apache Kafka para Hubs de Eventos do Azure