Eseguire la migrazione dei dati da PostgreSQL all'account Azure Cosmos DB for Apache Cassandra usando Apache Kafka

SI APPLICA A: Cassandra

L'API for Cassandra in Azure Cosmos DB è diventata un'ottima scelta per i carichi di lavoro aziendali in esecuzione in Apache Cassandra per vari motivi:

  • Risparmio significativo sui costi: è possibile risparmiare sui costi con Azure Cosmos DB, che include il costo delle macchine virtuali, della larghezza di banda e delle licenze Oracle applicabili. Inoltre, non è necessario gestire i costi relativi a data center, server, archiviazione SSD, rete ed elettricità.

  • Maggiore scalabilità e disponibilità: elimina i singoli punti di guasto e offre maggiore scalabilità e disponibilità per le applicazioni.

  • Nessun sovraccarico di gestione e monitoraggio: in qualità di servizio cloud completamente gestito, Azure Cosmos DB elimina il sovraccarico di gestione e monitoraggio di numerosissime di impostazioni.

Kafka Connect è una piattaforma per lo streaming dei dati tra Apache Kafka e altri sistemi in modo scalabile e affidabile. Supporta numerosi connettori predefiniti e quindi non è necessario codice personalizzato per integrare sistemi esterni con Apache Kafka.

Questo articolo illustra come usare una combinazione di connettori Kafka per configurare una pipeline di dati per sincronizzare in modo continuo i record da un database relazionale, ad esempio PostgreSQL con Azure Cosmos DB for Apache Cassandra.

Panoramica

Ecco una panoramica generale del flusso end-to-end presentato in questo articolo.

Verrà eseguito il push dei dati nella tabella PostgreSQL in Apache Kafka usando il connettore Debezium PostgreSQL, che è un connettore di origine Kafka Connect. Gli inserimenti, gli aggiornamenti o le eliminazioni nei record della tabella PostgreSQL verranno acquisiti come eventi change data e inviati ad argomenti Kafka. Il connettore DataStax Apache Kafka (connettore sink Kafka Connect), costituisce la seconda parte della pipeline. Sincronizza gli eventi di modifica dei dati dall'argomento Kafka alle tabelle di Azure Cosmos DB for Apache Cassandra.

Nota

L'uso di funzionalità specifiche del connettore DataStax Apache Kafka consente di eseguire il push dei dati in più tabelle. In questo esempio, il connettore consentirà di salvare in modo permanente i record di modifica dei dati in due tabelle Cassandra in grado di supportare requisiti di query diversi.

Prerequisiti

Configurazione di base

Configurare il database PostgreSQL, se non è già stato fatto.

Può trattarsi di un database locale esistente oppure è possibile scaricarne e installarne uno nel computer locale. È anche possibile usare un contenitore Docker.

Nota

L'esempio seguente esegue il pull di un'immagine del contenitore pubblica da Docker Hub. È consigliabile eseguire prima l'autenticazione con l'account Docker Hub (docker login) anziché effettuare una richiesta pull anonima. Per migliorare l'affidabilità quando si usa contenuto pubblico, importare e gestire l'immagine in un registro Azure Container privato. Altre informazioni sull'uso delle immagini pubbliche.

Per avviare un contenitore:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

Connettersi all'istanza di PostgreSQL usando il client psql:

psql -h localhost -p 5432 -U postgres -W -d postgres

Creare una tabella per archiviare le informazioni sugli ordini di esempio:

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

Usando il portale di Azure, creare il keyspace Cassandra e le tabelle necessari per l'applicazione demo.

Nota

Usare gli stessi nomi di keyspace e tabella indicati sotto

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Configurare Apache Kafka

Questo articolo usa un cluster locale, ma è possibile scegliere qualsiasi altra opzione. Scaricare Kafka, decomprimerlo e avviare il cluster Zookeeper e Kafka.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

Configurare i connettori

Installare i connettori Debezium PostgreSQL e DataStax Apache Kafka. Scaricare l'archivio del plug-in del connettore Debezium PostgreSQL. Ad esempio, per scaricare la versione 1.3.0 del connettore (più recente al momento della redazione del presente documento), usare questo collegamento. Scaricare il connettore Apache Kafka DataStax da questo collegamento.

Decomprimere entrambi gli archivi dei connettore e copiare i file JAR in Kafka Connect plugin.path.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

Per informazioni dettagliate, vedere la documentazione di Debezium e DataStax.

Configurare Kafka Connect e avviare la pipeline di dati

Avviare il cluster Kafka Connect

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

Avviare l'istanza del connettore PostgreSQL

Salvare la configurazione del connettore (JSON) in un file, ad esempio pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Per avviare l'istanza del connettore PostgreSQL:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

Nota

Per eliminarla, è possibile usare: curl -X DELETE http://localhost:8083/connectors/pg-orders-source

Inserire i dati

La tabella orders_info contiene i dettagli degli ordini, ad esempio ID ordine, ID cliente, città e così via. Popolare la tabella con dati casuali usando il codice SQL seguente.

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

Nella tabella verranno inseriti 10 record. Assicurarsi di aggiornare il numero di record in generate_series(1, 10) sotto in base ai requisiti. Ad esempio, per inserire 100 record, usare generate_series(1, 100)

Per confermare:

select * from retail.orders_info;

Controllare gli eventi Change Data Capture nell'argomento Kafka

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

Verranno visualizzati gli eventi di modifica dei dati in formato JSON.

Avviare l'istanza del connettore DataStax Apache Kafka

Salvare la configurazione del connettore (JSON) in un file, ad esempio cassandra-sink-config.json, e aggiornare le proprietà in base all'ambiente.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

Per avviare l'istanza del connettore:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Il connettore dovrebbe entrare in azione e la pipeline end-to-end da PostgreSQL ad Azure Cosmos DB sarà operativa.

Eseguire query in Azure Cosmos DB

Controllare le tabelle Cassandra in Azure Cosmos DB. Ecco alcune delle query che è possibile eseguire:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

È possibile continuare a inserire altri dati in PostgreSQL e verificare che i record siano sincronizzati con Azure Cosmos DB.

Passaggi successivi