Compartilhar via


Migrar dados do PostgreSQL para a conta do Azure Cosmos DB for Apache Cassandra usando o Apache Kafka

APLICA-SE AO: Cassandra

A API do Cassandra no Azure Cosmos DB se tornou uma ótima opção para cargas de trabalho corporativas em execução no Apache Cassandra por vários motivos, tais como:

  • Economia de custos significativa: você pode economizar custos com o Azure Cosmos DB, incluindo o custo de VMs, largura de banda e todas as licenças Oracle aplicáveis. Além disso, você não precisa gerenciar os custos de data centers, servidores, armazenamento de SSD, rede e eletricidade.

  • Melhor escalabilidade e disponibilidade: ele elimina pontos únicos de falha, melhora a escalabilidade e a disponibilidade dos aplicativos.

  • Sem sobrecarga de gerenciamento e monitoramento: como um serviço de nuvem totalmente gerenciado, o Azure Cosmos DB elimina a sobrecarga de gerenciamento e monitoramento de uma infinidade de configurações.

O Kafka Connect é uma plataforma para transmitir dados entre o Apache Kafka e outros sistemas de maneira escalonável e confiável. Ele oferece suporte a vários conectores prontos para uso, o que significa que você não precisa de código personalizado para integrar sistemas externos com o Apache Kafka.

Este artigo demonstrará como usar uma combinação de conectores Kafka para configurar um pipeline de dados para sincronizar continuamente os registros de um banco de dados relacional, como PostgreSQL, para o Azure Cosmos DB for Apache Cassandra.

Visão geral

Uma visão geral do fluxo completo é apresentada neste artigo.

Os dados na tabela PostgreSQL serão enviados por push para o Apache Kafka usando o conector Debezium PostgreSQL, que é um conector de origem do Kafka Connect. Inserções, atualizações ou exclusões em registros na tabela PostgreSQL serão capturados como change data eventos e enviados a tópicos do Kafka. O conector do DataStax Apache Kafka (conector do coletor do Kafka Connect) forma a segunda parte do pipeline. Ele sincronizará os eventos de alteração de dados do tópico do Kafka para as tabelas do Azure Cosmos DB for Apache Cassandra.

Observação

O uso de recursos específicos do conector Apache Kafka DataStax nos permite enviar dados por push para várias tabelas. Neste exemplo, o conector nos ajudará a manter registros de dados de alteração em duas tabelas do Cassandra que podem oferecer suporte a diferentes requisitos de consulta.

Pré-requisitos

Configuração básica

Configure o banco de dados do PostgreSQL se ainda não tiver configurado.

Pode ser um banco de dados local existente ou você pode baixar e instalar um no computador local. Também é possível usar um contêiner do Docker.

Observação

O exemplo a seguir efetua pull de uma imagem de contêiner público do Docker Hub. É recomendável autenticar-se com a conta do Docker Hub (docker login) primeiro, em vez de fazer uma solicitação de pull anônima. Para melhorar a confiabilidade ao usar o conteúdo público, importe e gerencie a imagem em um registro de contêiner privado do Azure. Saiba mais sobre como trabalhar com imagens públicas.

Para iniciar um contêiner:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

Conecte-se à sua instância do PostgreSQL usando o psql cliente:

psql -h localhost -p 5432 -U postgres -W -d postgres

Crie uma tabela para armazenar informações de pedidos de amostra:

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

Usando o portal do Azure, crie o keyspace Cassandra e as tabelas necessárias para o aplicativo de demonstração.

Observação

Use os mesmos nomes de keyspace e de tabela, conforme abaixo

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Instalar o Apache Kafka

Este artigo usa um cluster local, mas você pode escolher qualquer outra opção. Baixe o Kafka, descompacte-o, inicie o cluster Zookeeper e Kafka.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

Instalar conectores

Instale o conector do Debezium PostgreSQL e do DataStax Apache Kafka. Baixe o arquivo de plug-in do conector do Debezium PostgreSQL. Por exemplo, para baixar a versão 1.3.0 do conector (mais recente no momento da gravação), use este link. Baixe o conector do DataStax Apache Kafka deste link.

Descompacte ambos os arquivos do conector e copie os arquivos JAR para o caminho de plugin do Kafka Connect.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

Para obter detalhes, consulte a documentação do Debezium e do DataStax.

Configurar o Kafka Connect e iniciar o pipeline de dados

Iniciar o cluster do Kafka Connect

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

Iniciar instância do conector do PostgreSQL

Salvar a configuração do conector (JSON) em um exemplo de arquivo pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Para iniciar a instância do conector do PostgreSQL:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

Observação

Para excluir, você pode usar: curl -X DELETE http://localhost:8083/connectors/pg-orders-source

Inserir dados

A orders_info tabela contém detalhes do pedido, como ID do pedido, ID do cliente, cidade etc. Preencha a tabela com dados aleatórios usando o SQL abaixo.

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

Ele deve inserir 10 registros na tabela. Atualize o número de registros em generate_series(1, 10) abaixo de acordo com seu exemplo de requisitos, para inserir 100 registros, use generate_series(1, 100)

Para confirmar:

select * from retail.orders_info;

Verifique os eventos de captura de dados de alterações no tópico do Kafka

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

Você deve ver os eventos de dados de alterações no formato JSON.

Iniciar instância do conector DataStax Apache Kafka

Salve a configuração do conector (JSON) em um exemplo de arquivo cassandra-sink-config.json e atualize as propriedades de acordo com o seu ambiente.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

Para iniciar a instância do conector:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

O conector deve entrar em ação e o pipeline de ponta a ponta do PostgreSQL para o Azure Cosmos DB ficará operacional.

Consultar o Azure Cosmos DB

Verifique as tabelas do Cassandra no Azure Cosmos DB. Estas são algumas das consultas que você pode tentar:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

Você pode continuar inserindo mais dados no PostgreSQL e confirmar se os registros estão sincronizados com o Azure Cosmos DB.

Próximas etapas