Dela via


Migrera data från PostgreSQL till Azure Cosmos DB för Apache Cassandra-konto med Apache Kafka

GÄLLER FÖR: Kassandra

API för Cassandra i Azure Cosmos DB har blivit ett bra val för företagsarbetsbelastningar som körs på Apache Cassandra av olika orsaker, till exempel:

  • Betydande kostnadsbesparingar: Du kan spara kostnader med Azure Cosmos DB, vilket inkluderar kostnaden för virtuella datorer, bandbredd och eventuella tillämpliga Oracle-licenser. Dessutom behöver du inte hantera datacenter, servrar, SSD-lagring, nätverk och elkostnader.

  • Bättre skalbarhet och tillgänglighet: Det eliminerar enskilda felpunkter, bättre skalbarhet och tillgänglighet för dina program.

  • Inga kostnader för hantering och övervakning: Som en fullständigt hanterad molntjänst tar Azure Cosmos DB bort kostnaderna för att hantera och övervaka en mängd olika inställningar.

Kafka Connect är en plattform för att strömma data mellan Apache Kafka och andra system på ett skalbart och tillförlitligt sätt. Den stöder flera anslutningsprogram utanför hyllan, vilket innebär att du inte behöver anpassad kod för att integrera externa system med Apache Kafka.

Den här artikeln visar hur du använder en kombination av Kafka-anslutningsappar för att konfigurera en datapipeline för att kontinuerligt synkronisera poster från en relationsdatabas, till exempel PostgreSQL till Azure Cosmos DB för Apache Cassandra.

Översikt

Här är en översikt på hög nivå över det slutpunkt till slutpunkt-flöde som visas i den här artikeln.

Data i PostgreSQL-tabellen skickas till Apache Kafka med hjälp av Debezium PostgreSQL-anslutningsappen, som är en Kafka Connect-källanslutning. Infogningar, uppdateringar eller borttagning till poster i PostgreSQL-tabellen registreras som change data händelser och skickas till Kafka-ämnen. DataStax Apache Kafka-anslutningsappen (Kafka Connect-mottagaranslutningen) utgör den andra delen av pipelinen. Den synkroniserar ändringsdatahändelserna från Kafka-ämnet till Azure Cosmos DB för Apache Cassandra-tabeller.

Kommentar

Med hjälp av specifika funktioner i DataStax Apache Kafka-anslutningsappen kan vi skicka data till flera tabeller. I det här exemplet hjälper anslutningsappen oss att spara dataposter till två Cassandra-tabeller som har stöd för olika frågekrav.

Förutsättningar

Grundläggande konfiguration

Konfigurera PostgreSQL-databasen om du inte redan har gjort det.

Det kan vara en befintlig lokal databas eller så kan du ladda ned och installera en på den lokala datorn. Det går också att använda en Docker-container.

Kommentar

I följande exempel hämtas en offentlig containeravbildning från Docker Hub. Vi rekommenderar att du autentiserar med ditt Docker Hub-konto (docker login) först i stället för att göra en anonym pull-begäran. För att förbättra tillförlitligheten när du använder offentligt innehåll importerar och hanterar du avbildningen i ett privat Azure-containerregister. Läs mer om hur du arbetar med offentliga avbildningar.

Så här startar du en container:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

Anslut till din PostgreSQL-instans med hjälp av psql klienten:

psql -h localhost -p 5432 -U postgres -W -d postgres

Skapa en tabell för att lagra exempelorderinformation:

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

Med hjälp av Azure Portal skapar du Cassandra Keyspace och de tabeller som krävs för demoprogrammet.

Kommentar

Använd samma nyckelområdes- och tabellnamn som nedan

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Konfigurera Apache Kafka

Den här artikeln använder ett lokalt kluster, men du kan välja alla andra alternativ. Ladda ned Kafka, packa upp det, starta Zookeeper- och Kafka-klustret.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

Konfigurera anslutningsappar

Installera Anslutningsprogrammet Debezium PostgreSQL och DataStax Apache Kafka. Ladda ned plugin-arkivet för Debezium PostgreSQL-anslutningsprogrammet. Om du till exempel vill ladda ned version 1.3.0 av anslutningsappen (senast i skrivande stund) använder du den här länken. Ladda ned DataStax Apache Kafka-anslutningsappen från den här länken.

Packa upp båda anslutningsappens arkiv och kopiera JAR-filerna till Plugin.path för Kafka Connect.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

Mer information finns i dokumentationen om Debezium och DataStax .

Konfigurera Kafka Connect och starta datapipeline

Starta Kafka Connect-kluster

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

Starta PostgreSQL-anslutningsinstans

Spara anslutningskonfigurationen (JSON) i ett filexempel pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Så här startar du PostgreSQL-anslutningsinstansen:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

Kommentar

Om du vill ta bort kan du använda: curl -X DELETE http://localhost:8083/connectors/pg-orders-source

Infoga data

Tabellen orders_info innehåller orderinformation som order-ID, kund-ID, ort osv. Fyll i tabellen med slumpmässiga data med hjälp av nedanstående SQL.

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

Den bör infoga 10 poster i tabellen. Se till att uppdatera antalet poster i generate_series(1, 10) nedan enligt dina krav exempel, för att infoga 100 poster, använda generate_series(1, 100)

Att bekräfta:

select * from retail.orders_info;

Kontrollera ändringsdatainsamlingshändelserna i Kafka-ämnet

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

Du bör se ändringsdatahändelserna i JSON-format.

Starta DataStax Apache Kafka-anslutningsinstans

Spara anslutningskonfigurationen (JSON) i ett filexempel cassandra-sink-config.json och uppdatera egenskaperna enligt din miljö.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

Så här startar du anslutningsinstansen:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Anslutningsappen bör börja fungera och pipelinen från slutpunkt till slutpunkt från PostgreSQL till Azure Cosmos DB kommer att fungera.

Köra frågor mot Azure Cosmos DB

Kontrollera Cassandra-tabellerna i Azure Cosmos DB. Här är några av de frågor som du kan prova:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

Du kan fortsätta att infoga mer data i PostgreSQL och bekräfta att posterna synkroniseras till Azure Cosmos DB.

Nästa steg