Migrar dados do PostgreSQL para a conta do Azure Cosmos DB para Apache Cassandra usando o Apache Kafka

APLICA-SE A: Cassandra

A API para Cassandra no Azure Cosmos DB tornou-se uma ótima opção para cargas de trabalho corporativas em execução no Apache Cassandra por vários motivos, como:

  • Economia de custos significativa: você pode economizar custos com o Azure Cosmos DB, que inclui o custo de VMs, largura de banda e quaisquer licenças Oracle aplicáveis. Além disso, você não precisa gerenciar os data centers, servidores, armazenamento SSD, rede e custos de eletricidade.

  • Melhor escalabilidade e disponibilidade: elimina pontos únicos de falha, melhor escalabilidade e disponibilidade para seus aplicativos.

  • Sem sobrecarga de gerenciamento e monitoramento: como um serviço de nuvem totalmente gerenciado, o Azure Cosmos DB remove a sobrecarga de gerenciar e monitorar uma infinidade de configurações.

Kafka Connect é uma plataforma para transmitir dados entre o Apache Kafka e outros sistemas de forma escalável e confiável. Ele suporta vários conectores prontos para uso, o que significa que você não precisa de código personalizado para integrar sistemas externos com o Apache Kafka.

Este artigo demonstrará como usar uma combinação de conectores Kafka para configurar um pipeline de dados para sincronizar continuamente registros de um banco de dados relacional, como PostgreSQL , para o Azure Cosmos DB para Apache Cassandra.

Descrição geral

Aqui está uma visão geral de alto nível do fluxo de ponta a ponta apresentado neste artigo.

Os dados na tabela PostgreSQL serão enviados para o Apache Kafka usando o conector Debezium PostgreSQL, que é um conector de origem Kafka Connect. Inserções, atualizações ou exclusões de registros na tabela PostgreSQL serão capturadas como change data eventos e enviadas para o(s) tópico(s) Kafka. O conector DataStax Apache Kafka (conector de coletor Kafka Connect), forma a segunda parte do pipeline. Ele sincronizará os eventos de dados de alteração do tópico Kafka para as tabelas do Azure Cosmos DB para Apache Cassandra.

Nota

Usando recursos específicos do conector DataStax Apache Kafka nos permite enviar dados para várias tabelas. Neste exemplo, o conector nos ajudará a persistir a alteração de registros de dados para duas tabelas Cassandra que podem suportar requisitos de consulta diferentes.

Pré-requisitos

Configuração básica

Configure o banco de dados PostgreSQL, se ainda não o fez.

Pode ser um banco de dados local existente ou você pode baixar e instalar um em sua máquina local. Também é possível usar um contêiner do Docker.

Nota

O exemplo a seguir extrai uma imagem de contêiner público do Docker Hub. Recomendamos que você se autentique com sua conta do Docker Hub (docker login) primeiro, em vez de fazer uma solicitação pull anônima. Para melhorar a confiabilidade ao usar conteúdo público, importe e gerencie a imagem em um registro de contêiner privado do Azure. Saiba mais sobre como trabalhar com imagens públicas.

Para iniciar um contêiner:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

Conecte-se à sua instância do PostgreSQL usando psql o cliente:

psql -h localhost -p 5432 -U postgres -W -d postgres

Crie uma tabela para armazenar informações de ordem de exemplo:

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

Usando o portal do Azure, crie o Cassandra Keyspace e as tabelas necessárias para o aplicativo de demonstração.

Nota

Use o mesmo espaço de teclas e nomes de tabela como abaixo

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Configuração do Apache Kafka

Este artigo usa um cluster local, mas você pode escolher qualquer outra opção. Baixe Kafka, descompacte-o, inicie o cluster Zookeeper e Kafka.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

Conectores de configuração

Instale o conector Debezium PostgreSQL e DataStax Apache Kafka. Baixe o arquivo de plug-in do conector Debezium PostgreSQL. Por exemplo, para baixar a versão 1.3.0 do conector (mais recente no momento da escrita), use este link. Faça o download do conector DataStax Apache Kafka neste link.

Descompacte os arquivos do conector e copie os arquivos JAR para o plugin.path do Kafka Connect.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

Para obter detalhes, consulte a documentação Debezium e DataStax.

Configurar o Kafka Connect e iniciar o pipeline de dados

Iniciar cluster Kafka Connect

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

Iniciar instância do conector PostgreSQL

Salve a configuração do conector (JSON) em um exemplo de arquivo pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Para iniciar a instância do conector PostgreSQL:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

Nota

Para excluir, você pode usar: curl -X DELETE http://localhost:8083/connectors/pg-orders-source

Inserir dados

A orders_info tabela contém detalhes do pedido, como ID do pedido, ID do cliente, cidade, etc. Preencha a tabela com dados aleatórios usando o SQL abaixo.

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

Deve inserir 10 registos na tabela. Certifique-se de atualizar o número de registros abaixo generate_series(1, 10) de acordo com seu exemplo de requisitos, para inserir 100 registros, use generate_series(1, 100)

Para confirmar:

select * from retail.orders_info;

Verifique os eventos de captura de dados de alteração no tópico Kafka

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

Você deve ver os eventos de alteração de dados no formato JSON.

Iniciar instância do conector DataStax Apache Kafka

Salve a configuração do conector (JSON) em um exemplo cassandra-sink-config.json de arquivo e atualize as propriedades de acordo com seu ambiente.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

Para iniciar a instância do conector:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

O conector deve entrar em ação e o pipeline de ponta a ponta do PostgreSQL para o Azure Cosmos DB estará operacional.

Consultar o Azure Cosmos DB

Verifique as tabelas Cassandra no Azure Cosmos DB. Aqui estão algumas das consultas que você pode tentar:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

Você pode continuar a inserir mais dados no PostgreSQL e confirmar se os registros estão sincronizados com o Azure Cosmos DB.

Próximos passos