Delen via


Gegevens migreren van PostgreSQL naar Een Azure Cosmos DB voor Apache Cassandra-account met behulp van Apache Kafka

VAN TOEPASSING OP: Cassandra

API voor Cassandra in Azure Cosmos DB is een uitstekende keuze geworden voor zakelijke workloads die worden uitgevoerd op Apache Cassandra om verschillende redenen, zoals:

  • Aanzienlijke kostenbesparingen: u kunt kosten besparen met Azure Cosmos DB, waaronder de kosten van VM's, bandbreedte en eventuele toepasselijke Oracle-licenties. Daarnaast hoeft u de datacenters, servers, SSD-opslag, netwerken en elektriciteitskosten niet te beheren.

  • Betere schaalbaarheid en beschikbaarheid: Hiermee worden single points of failure, betere schaalbaarheid en beschikbaarheid voor uw toepassingen geëlimineerd.

  • Geen overhead voor het beheren en bewaken: Als volledig beheerde cloudservice verwijdert Azure Cosmos DB de overhead van het beheren en bewaken van talloze instellingen.

Kafka Connect is een platform voor het streamen van gegevens tussen Apache Kafka en andere systemen op een schaalbare en betrouwbare manier. Het ondersteunt verschillende connectors buiten de plank, wat betekent dat u geen aangepaste code nodig hebt om externe systemen te integreren met Apache Kafka.

In dit artikel wordt uitgelegd hoe u een combinatie van Kafka-connectors gebruikt om een gegevenspijplijn in te stellen om records van een relationele database, zoals PostgreSQL , continu te synchroniseren met Azure Cosmos DB voor Apache Cassandra.

Overzicht

Hier volgt een algemeen overzicht van de end-to-end-stroom die in dit artikel wordt gepresenteerd.

Gegevens in de PostgreSQL-tabel worden naar Apache Kafka gepusht met behulp van de Debezium PostgreSQL-connector, een Kafka Connect-bronconnector. Invoegen, bijwerken of verwijderen van records in de PostgreSQL-tabel worden vastgelegd als change data gebeurtenissen en verzonden naar Kafka-onderwerp(en). De DataStax Apache Kafka-connector (Kafka Connect-sinkconnector) vormt het tweede deel van de pijplijn. Hiermee worden de wijzigingsgegevensgebeurtenissen van kafka-onderwerp gesynchroniseerd met Azure Cosmos DB voor Apache Cassandra-tabellen.

Notitie

Door specifieke functies van de DataStax Apache Kafka-connector te gebruiken, kunnen we gegevens naar meerdere tabellen pushen. In dit voorbeeld helpt de connector ons bij het behouden van wijzigingsgegevensrecords in twee Cassandra-tabellen die verschillende queryvereisten kunnen ondersteunen.

Vereisten

Basisinstallatie

Stel postgreSQL-database in als u dat nog niet hebt gedaan.

Dit kan een bestaande on-premises database zijn of u kunt er een downloaden en installeren op uw lokale computer. Het is ook mogelijk om een Docker-container te gebruiken.

Notitie

In het volgende voorbeeld wordt een openbare containerinstallatiekopie opgehaald uit Docker Hub. U wordt aangeraden eerst te verifiëren met uw Docker Hub-account (docker login) in plaats van een anonieme pull-aanvraag te maken. Als u de betrouwbaarheid wilt verbeteren bij het gebruik van openbare inhoud, importeert en beheert u de installatiekopieën in een privé-Azure-containerregister. Meer informatie over het werken met openbare afbeeldingen.

Een container starten:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

Maak verbinding met uw PostgreSQL-exemplaar met behulp van psql de client:

psql -h localhost -p 5432 -U postgres -W -d postgres

Maak een tabel om voorbeeldordergegevens op te slaan:

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

Maak met behulp van Azure Portal de Cassandra Keyspace en de tabellen die vereist zijn voor de demotoepassing.

Notitie

Gebruik dezelfde Keyspace- en tabelnamen als hieronder

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Apache Kafka instellen

In dit artikel wordt een lokaal cluster gebruikt, maar u kunt een andere optie kiezen. Download Kafka, pak het uit, start het Zookeeper- en Kafka-cluster.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

Connectors instellen

Installeer de Connector Debezium PostgreSQL en DataStax Apache Kafka. Download het invoegtoepassingsarchief van de Debezium PostgreSQL-connector. Als u bijvoorbeeld versie 1.3.0 van de connector (uiterlijk op het moment van schrijven) wilt downloaden, gebruikt u deze koppeling. Download de DataStax Apache Kafka-connector via deze koppeling.

Pak zowel de connectorarchieven uit als kopieer de JAR-bestanden naar het Kafka Connect-invoegtoepassingspad.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

Raadpleeg de documentatie van Debezium en DataStax voor meer informatie.

Kafka Connect configureren en gegevenspijplijn starten

Kafka Connect-cluster starten

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

PostgreSQL-connectorexemplaren starten

De connectorconfiguratie (JSON) opslaan in een bestandsvoorbeeld pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Het PostgreSQL-connectorexemplaren starten:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

Notitie

Als u wilt verwijderen, kunt u het volgende gebruiken: curl -X DELETE http://localhost:8083/connectors/pg-orders-source

Gegevens invoegen

De orders_info tabel bevat ordergegevens, zoals order-id, klant-id, plaats, enzovoort. Vul de tabel in met willekeurige gegevens met behulp van de onderstaande SQL.

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

Er moeten 10 records in de tabel worden ingevoegd. Zorg ervoor dat u het aantal records hieronder bijwerkt generate_series(1, 10) op basis van uw vereistenvoorbeeld om records in te voegen 100 , gebruikt u generate_series(1, 100)

Ga als volgt te werk om te bevestigen:

select * from retail.orders_info;

Controleer de gebeurtenissen voor het vastleggen van wijzigingengegevens in het Kafka-onderwerp

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

Als het goed is, ziet u de wijzigingen in de JSON-indeling.

DataStax Apache Kafka-connectorexemplaren starten

Sla de connectorconfiguratie (JSON) op in een bestandsvoorbeeld cassandra-sink-config.json en werk de eigenschappen bij volgens uw omgeving.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

Het connectorexemplaren starten:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

De connector moet in actie komen en de end-to-end-pijplijn van PostgreSQL naar Azure Cosmos DB zal operationeel zijn.

Query Azure Cosmos DB

Controleer de Cassandra-tabellen in Azure Cosmos DB. Hier volgen enkele van de query's die u kunt proberen:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

U kunt meer gegevens blijven invoegen in PostgreSQL en controleren of de records zijn gesynchroniseerd met Azure Cosmos DB.

Volgende stappen