Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
GÄLLER FÖR: Kassandra
API för Cassandra i Azure Cosmos DB har blivit ett bra val för företagsarbetsbelastningar som körs på Apache Cassandra av olika orsaker, till exempel:
Betydande kostnadsbesparingar: Du kan spara kostnader med Azure Cosmos DB, vilket inkluderar kostnaden för virtuella datorer, bandbredd och eventuella tillämpliga Oracle-licenser. Dessutom behöver du inte hantera datacenter, servrar, SSD-lagring, nätverk och elkostnader.
Bättre skalbarhet och tillgänglighet: Det eliminerar enskilda felpunkter, bättre skalbarhet och tillgänglighet för dina program.
Inga kostnader för hantering och övervakning: Som en fullständigt hanterad molntjänst tar Azure Cosmos DB bort kostnaderna för att hantera och övervaka en mängd olika inställningar.
Kafka Connect är en plattform för att strömma data mellan Apache Kafka och andra system på ett skalbart och tillförlitligt sätt. Den stöder flera anslutningsprogram utanför hyllan, vilket innebär att du inte behöver anpassad kod för att integrera externa system med Apache Kafka.
Den här artikeln visar hur du använder en kombination av Kafka-anslutningsappar för att konfigurera en datapipeline för att kontinuerligt synkronisera poster från en relationsdatabas, till exempel PostgreSQL till Azure Cosmos DB för Apache Cassandra.
Översikt
Här är en översikt på hög nivå över det slutpunkt till slutpunkt-flöde som visas i den här artikeln.
Data i PostgreSQL-tabellen skickas till Apache Kafka med hjälp av Debezium PostgreSQL-anslutningsappen, som är en Kafka Connect-källanslutning. Infogningar, uppdateringar eller borttagning till poster i PostgreSQL-tabellen registreras som change data
händelser och skickas till Kafka-ämnen. DataStax Apache Kafka-anslutningsappen (Kafka Connect-mottagaranslutningen) utgör den andra delen av pipelinen. Den synkroniserar ändringsdatahändelserna från Kafka-ämnet till Azure Cosmos DB för Apache Cassandra-tabeller.
Kommentar
Med hjälp av specifika funktioner i DataStax Apache Kafka-anslutningsappen kan vi skicka data till flera tabeller. I det här exemplet hjälper anslutningsappen oss att spara dataposter till två Cassandra-tabeller som har stöd för olika frågekrav.
Förutsättningar
- Etablera ett Azure Cosmos DB för Apache Cassandra-konto
- Använda cqlsh för validering
- JDK 8 eller senare
- Docker (valfritt)
Grundläggande konfiguration
Konfigurera PostgreSQL-databasen om du inte redan har gjort det.
Det kan vara en befintlig lokal databas eller så kan du ladda ned och installera en på den lokala datorn. Det går också att använda en Docker-container.
Kommentar
I följande exempel hämtas en offentlig containeravbildning från Docker Hub. Vi rekommenderar att du autentiserar med ditt Docker Hub-konto (docker login
) först i stället för att göra en anonym pull-begäran. För att förbättra tillförlitligheten när du använder offentligt innehåll importerar och hanterar du avbildningen i ett privat Azure-containerregister. Läs mer om hur du arbetar med offentliga avbildningar.
Så här startar du en container:
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres
Anslut till din PostgreSQL-instans med hjälp av psql
klienten:
psql -h localhost -p 5432 -U postgres -W -d postgres
Skapa en tabell för att lagra exempelorderinformation:
CREATE SCHEMA retail;
CREATE TABLE retail.orders_info (
orderid SERIAL NOT NULL PRIMARY KEY,
custid INTEGER NOT NULL,
amount INTEGER NOT NULL,
city VARCHAR(255) NOT NULL,
purchase_time VARCHAR(40) NOT NULL
);
Med hjälp av Azure Portal skapar du Cassandra Keyspace och de tabeller som krävs för demoprogrammet.
Kommentar
Använd samma nyckelområdes- och tabellnamn som nedan
CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Konfigurera Apache Kafka
Den här artikeln använder ett lokalt kluster, men du kan välja alla andra alternativ. Ladda ned Kafka, packa upp det, starta Zookeeper- och Kafka-klustret.
cd <KAFKA_HOME>/bin
#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties
Konfigurera anslutningsappar
Installera Anslutningsprogrammet Debezium PostgreSQL och DataStax Apache Kafka. Ladda ned plugin-arkivet för Debezium PostgreSQL-anslutningsprogrammet. Om du till exempel vill ladda ned version 1.3.0 av anslutningsappen (senast i skrivande stund) använder du den här länken. Ladda ned DataStax Apache Kafka-anslutningsappen från den här länken.
Packa upp båda anslutningsappens arkiv och kopiera JAR-filerna till Plugin.path för Kafka Connect.
cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs
Mer information finns i dokumentationen om Debezium och DataStax .
Konfigurera Kafka Connect och starta datapipeline
Starta Kafka Connect-kluster
cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties
Starta PostgreSQL-anslutningsinstans
Spara anslutningskonfigurationen (JSON) i ett filexempel pg-source-config.json
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.include.list": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Så här startar du PostgreSQL-anslutningsinstansen:
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors
Kommentar
Om du vill ta bort kan du använda: curl -X DELETE http://localhost:8083/connectors/pg-orders-source
Infoga data
Tabellen orders_info
innehåller orderinformation som order-ID, kund-ID, ort osv. Fyll i tabellen med slumpmässiga data med hjälp av nedanstående SQL.
insert into retail.orders_info (
custid, amount, city, purchase_time
)
select
random() * 10000 + 1,
random() * 200,
('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);
Den bör infoga 10 poster i tabellen. Se till att uppdatera antalet poster i generate_series(1, 10)
nedan enligt dina krav exempel, för att infoga 100
poster, använda generate_series(1, 100)
Att bekräfta:
select * from retail.orders_info;
Kontrollera ändringsdatainsamlingshändelserna i Kafka-ämnet
cd <KAFKA_HOME>/bin
./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning
Du bör se ändringsdatahändelserna i JSON-format.
Starta DataStax Apache Kafka-anslutningsinstans
Spara anslutningskonfigurationen (JSON) i ett filexempel cassandra-sink-config.json
och uppdatera egenskaperna enligt din miljö.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "myserver.retail.orders_info",
"contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
"loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
"ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
"port": 10350,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"auth.username": "<Azure Cosmos DB user name (same as account name)>",
"auth.password": "<Azure Cosmos DB password>",
"topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"offset.flush.interval.ms": 10000
}
}
Så här startar du anslutningsinstansen:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
Anslutningsappen bör börja fungera och pipelinen från slutpunkt till slutpunkt från PostgreSQL till Azure Cosmos DB kommer att fungera.
Köra frågor mot Azure Cosmos DB
Kontrollera Cassandra-tabellerna i Azure Cosmos DB. Här är några av de frågor som du kan prova:
select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;
select * from retail.orders_by_customer;
select * from retail.orders_by_city;
select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;
Du kan fortsätta att infoga mer data i PostgreSQL och bekräfta att posterna synkroniseras till Azure Cosmos DB.
Nästa steg
- Integrera Apache Kafka och Azure Cosmos DB för Apache Cassandra med Kafka Connect
- Integrera Apache Kafka Connect på Azure Event Hubs (förhandsversion) med Debezium for Change Data Capture
- Migrera data från Oracle till Azure Cosmos DB för Apache Cassandra med Arcion
- Etablera dataflöde på containrar och databaser
- Beräkna RU/s med hjälp av artiklarna om Kapacitetsplanerare för Azure Cosmos DB