Gegevens migreren van PostgreSQL naar Een Azure Cosmos DB voor Apache Cassandra-account met behulp van Apache Kafka
VAN TOEPASSING OP: Cassandra
API voor Cassandra in Azure Cosmos DB is een uitstekende keuze geworden voor zakelijke workloads die worden uitgevoerd op Apache Cassandra om verschillende redenen, zoals:
Aanzienlijke kostenbesparingen: u kunt kosten besparen met Azure Cosmos DB, waaronder de kosten van VM's, bandbreedte en eventuele toepasselijke Oracle-licenties. Daarnaast hoeft u de datacenters, servers, SSD-opslag, netwerken en elektriciteitskosten niet te beheren.
Betere schaalbaarheid en beschikbaarheid: Hiermee worden single points of failure, betere schaalbaarheid en beschikbaarheid voor uw toepassingen geëlimineerd.
Geen overhead voor het beheren en bewaken: Als volledig beheerde cloudservice verwijdert Azure Cosmos DB de overhead van het beheren en bewaken van talloze instellingen.
Kafka Connect is een platform voor het streamen van gegevens tussen Apache Kafka en andere systemen op een schaalbare en betrouwbare manier. Het ondersteunt verschillende connectors buiten de plank, wat betekent dat u geen aangepaste code nodig hebt om externe systemen te integreren met Apache Kafka.
In dit artikel wordt uitgelegd hoe u een combinatie van Kafka-connectors gebruikt om een gegevenspijplijn in te stellen om records van een relationele database, zoals PostgreSQL , continu te synchroniseren met Azure Cosmos DB voor Apache Cassandra.
Overzicht
Hier volgt een algemeen overzicht van de end-to-end-stroom die in dit artikel wordt gepresenteerd.
Gegevens in de PostgreSQL-tabel worden naar Apache Kafka gepusht met behulp van de Debezium PostgreSQL-connector, een Kafka Connect-bronconnector. Invoegen, bijwerken of verwijderen van records in de PostgreSQL-tabel worden vastgelegd als change data
gebeurtenissen en verzonden naar Kafka-onderwerp(en). De DataStax Apache Kafka-connector (Kafka Connect-sinkconnector) vormt het tweede deel van de pijplijn. Hiermee worden de wijzigingsgegevensgebeurtenissen van kafka-onderwerp gesynchroniseerd met Azure Cosmos DB voor Apache Cassandra-tabellen.
Notitie
Door specifieke functies van de DataStax Apache Kafka-connector te gebruiken, kunnen we gegevens naar meerdere tabellen pushen. In dit voorbeeld helpt de connector ons bij het behouden van wijzigingsgegevensrecords in twee Cassandra-tabellen die verschillende queryvereisten kunnen ondersteunen.
Vereisten
- Een Azure Cosmos DB inrichten voor een Apache Cassandra-account
- cqlsh gebruiken voor validatie
- JDK 8 of hoger
- Docker (optioneel)
Basisinstallatie
Stel postgreSQL-database in als u dat nog niet hebt gedaan.
Dit kan een bestaande on-premises database zijn of u kunt er een downloaden en installeren op uw lokale computer. Het is ook mogelijk om een Docker-container te gebruiken.
Notitie
In het volgende voorbeeld wordt een openbare containerinstallatiekopie opgehaald uit Docker Hub. U wordt aangeraden eerst te verifiëren met uw Docker Hub-account (docker login
) in plaats van een anonieme pull-aanvraag te maken. Als u de betrouwbaarheid wilt verbeteren bij het gebruik van openbare inhoud, importeert en beheert u de installatiekopieën in een privé-Azure-containerregister. Meer informatie over het werken met openbare afbeeldingen.
Een container starten:
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres
Maak verbinding met uw PostgreSQL-exemplaar met behulp van psql
de client:
psql -h localhost -p 5432 -U postgres -W -d postgres
Maak een tabel om voorbeeldordergegevens op te slaan:
CREATE SCHEMA retail;
CREATE TABLE retail.orders_info (
orderid SERIAL NOT NULL PRIMARY KEY,
custid INTEGER NOT NULL,
amount INTEGER NOT NULL,
city VARCHAR(255) NOT NULL,
purchase_time VARCHAR(40) NOT NULL
);
Maak met behulp van Azure Portal de Cassandra Keyspace en de tabellen die vereist zijn voor de demotoepassing.
Notitie
Gebruik dezelfde Keyspace- en tabelnamen als hieronder
CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Apache Kafka instellen
In dit artikel wordt een lokaal cluster gebruikt, maar u kunt een andere optie kiezen. Download Kafka, pak het uit, start het Zookeeper- en Kafka-cluster.
cd <KAFKA_HOME>/bin
#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties
Connectors instellen
Installeer de Connector Debezium PostgreSQL en DataStax Apache Kafka. Download het invoegtoepassingsarchief van de Debezium PostgreSQL-connector. Als u bijvoorbeeld versie 1.3.0 van de connector (uiterlijk op het moment van schrijven) wilt downloaden, gebruikt u deze koppeling. Download de DataStax Apache Kafka-connector via deze koppeling.
Pak zowel de connectorarchieven uit als kopieer de JAR-bestanden naar het Kafka Connect-invoegtoepassingspad.
cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs
Raadpleeg de documentatie van Debezium en DataStax voor meer informatie.
Kafka Connect configureren en gegevenspijplijn starten
Kafka Connect-cluster starten
cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties
PostgreSQL-connectorexemplaren starten
De connectorconfiguratie (JSON) opslaan in een bestandsvoorbeeld pg-source-config.json
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.include.list": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Het PostgreSQL-connectorexemplaren starten:
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors
Notitie
Als u wilt verwijderen, kunt u het volgende gebruiken: curl -X DELETE http://localhost:8083/connectors/pg-orders-source
Gegevens invoegen
De orders_info
tabel bevat ordergegevens, zoals order-id, klant-id, plaats, enzovoort. Vul de tabel in met willekeurige gegevens met behulp van de onderstaande SQL.
insert into retail.orders_info (
custid, amount, city, purchase_time
)
select
random() * 10000 + 1,
random() * 200,
('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);
Er moeten 10 records in de tabel worden ingevoegd. Zorg ervoor dat u het aantal records hieronder bijwerkt generate_series(1, 10)
op basis van uw vereistenvoorbeeld om records in te voegen 100
, gebruikt u generate_series(1, 100)
Ga als volgt te werk om te bevestigen:
select * from retail.orders_info;
Controleer de gebeurtenissen voor het vastleggen van wijzigingengegevens in het Kafka-onderwerp
cd <KAFKA_HOME>/bin
./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning
Als het goed is, ziet u de wijzigingen in de JSON-indeling.
DataStax Apache Kafka-connectorexemplaren starten
Sla de connectorconfiguratie (JSON) op in een bestandsvoorbeeld cassandra-sink-config.json
en werk de eigenschappen bij volgens uw omgeving.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "myserver.retail.orders_info",
"contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
"loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
"ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
"port": 10350,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"auth.username": "<Azure Cosmos DB user name (same as account name)>",
"auth.password": "<Azure Cosmos DB password>",
"topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"offset.flush.interval.ms": 10000
}
}
Het connectorexemplaren starten:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
De connector moet in actie komen en de end-to-end-pijplijn van PostgreSQL naar Azure Cosmos DB zal operationeel zijn.
Query Azure Cosmos DB
Controleer de Cassandra-tabellen in Azure Cosmos DB. Hier volgen enkele van de query's die u kunt proberen:
select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;
select * from retail.orders_by_customer;
select * from retail.orders_by_city;
select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;
U kunt meer gegevens blijven invoegen in PostgreSQL en controleren of de records zijn gesynchroniseerd met Azure Cosmos DB.
Volgende stappen
- Apache Kafka en Azure Cosmos DB integreren voor Apache Cassandra met behulp van Kafka Connect
- Apache Kafka Connect integreren in Azure Event Hubs (preview) met Debezium voor Change Data Capture
- Gegevens migreren van Oracle naar Azure Cosmos DB voor Apache Cassandra met behulp van Arcion
- Doorvoer voor containers en databases inrichten
- Best practices voor partitiesleutels
- RU/s schatten met behulp van de artikelen over capaciteitsplanners van Azure Cosmos DB