Apache Kafka kullanarak PostgreSQL'den Apache Cassandra için Azure Cosmos DB hesabına veri geçirme
ŞUNLAR IÇIN GEÇERLIDIR: Cassandra
Azure Cosmos DB'de Cassandra API'si, Apache Cassandra üzerinde çalışan kurumsal iş yükleri için aşağıdakiler gibi çeşitli nedenlerle harika bir seçim haline gelmiştir:
Önemli maliyet tasarrufu: VM'lerin, bant genişliğinin ve geçerli Oracle lisanslarının maliyetini içeren Azure Cosmos DB ile maliyet tasarrufu yapabilirsiniz. Ayrıca veri merkezlerini, sunucuları, SSD depolamayı, ağ ve elektrik maliyetlerini yönetmeniz gerekmez.
Daha iyi ölçeklenebilirlik ve kullanılabilirlik: Uygulamalarınız için tek hata noktalarını, daha iyi ölçeklenebilirliği ve kullanılabilirliği ortadan kaldırır.
Yönetme ve izleme yükü yoktur: Azure Cosmos DB, tam olarak yönetilen bir bulut hizmeti olarak çok sayıda ayarı yönetme ve izleme yükünü ortadan kaldırır.
Kafka Bağlan, Apache Kafka ile diğer sistemler arasında ölçeklenebilir ve güvenilir bir şekilde veri akışı yapılan bir platformdur. Birkaç raf bağlayıcısını destekler; bu da dış sistemleri Apache Kafka ile tümleştirmek için özel koda ihtiyacınız olmadığı anlamına gelir.
Bu makalede, PostgreSQLgibi bir ilişkisel veritabanındaki kayıtları Apache Cassandra için Azure Cosmos DB'ye sürekli olarak eşitlemek üzere bir veri işlem hattı ayarlamak için Kafka bağlayıcılarının bir bileşiminin nasıl kullanılacağı gösterilmektedir.
Genel bakış
Bu makalede sunulan uçtan uca akışa üst düzey genel bakış aşağıda verilmiştir.
PostgreSQL tablosundaki veriler, Kafka Bağlan kaynak bağlayıcısı olan Debezium PostgreSQL bağlayıcısı kullanılarak Apache Kafka'ya gönderilir. PostgreSQL tablosundaki kayıtlara ekleme, güncelleştirme veya silme işlemleri olay olarak change data
yakalanır ve Kafka konularına gönderilir. DataStax Apache Kafka bağlayıcısı (Kafka Bağlan havuz bağlayıcısı), işlem hattının ikinci bölümünü oluşturur. Kafka konu başlığındaki değişiklik veri olaylarını Apache Cassandra için Azure Cosmos DB tablolarına eşitler.
Not
DataStax Apache Kafka bağlayıcısının belirli özelliklerini kullanmak, verileri birden çok tabloya göndermemizi sağlar. Bu örnekte bağlayıcı, değişiklik veri kayıtlarını farklı sorgu gereksinimlerini destekleyebilecek iki Cassandra tablosuna kalıcı hale getirmemize yardımcı olacaktır.
Önkoşullar
- Apache Cassandra hesabı için Azure Cosmos DB sağlama
- Doğrulama için cqlsh kullanma
- JDK 8 veya üzeri
- Docker (isteğe bağlı)
Temel kurulum
Henüz yapmadıysanız PostgreSQL veritabanını ayarlayın.
Bu mevcut bir şirket içi veritabanı olabilir veya yerel makinenize indirip yükleyebilirsiniz. Docker kapsayıcısı da kullanılabilir.
Not
Aşağıdaki örnek, Docker Hub'dan bir genel kapsayıcı görüntüsü çeker. Anonim çekme isteğinde bulunmak yerine önce Docker Hub hesabınızla (docker login
) kimlik doğrulaması yapmanızı öneririz. Genel içerik kullanırken güvenilirliği artırmak için görüntüyü özel bir Azure kapsayıcı kayıt defterinde içeri aktarın ve yönetin. Genel görüntülerle çalışma hakkında daha fazla bilgi edinin.
Kapsayıcıyı başlatmak için:
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres
İstemci kullanarak psql
PostgreSQL örneğine Bağlan:
psql -h localhost -p 5432 -U postgres -W -d postgres
Örnek sipariş bilgilerini depolamak için bir tablo oluşturun:
CREATE SCHEMA retail;
CREATE TABLE retail.orders_info (
orderid SERIAL NOT NULL PRIMARY KEY,
custid INTEGER NOT NULL,
amount INTEGER NOT NULL,
city VARCHAR(255) NOT NULL,
purchase_time VARCHAR(40) NOT NULL
);
Azure portalını kullanarak Cassandra Keyspace'i ve tanıtım uygulaması için gereken tabloları oluşturun.
Not
Aşağıdakiyle aynı Keyspace ve tablo adlarını kullanın
CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Apache Kafka kurulumu
Bu makalede yerel küme kullanılır, ancak başka bir seçenek belirleyebilirsiniz. Kafka'yı indirin, sıkıştırmasını açın, Zookeeper ve Kafka kümesini başlatın.
cd <KAFKA_HOME>/bin
#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties
Bağlayıcıları ayarlama
Debezium PostgreSQL ve DataStax Apache Kafka bağlayıcısını yükleyin. Debezium PostgreSQL bağlayıcısı eklenti arşivini indirin. Örneğin, bağlayıcının 1.3.0 sürümünü indirmek için (yazma sırasında en son), bu bağlantıyı kullanın. Bu bağlantıdan DataStax Apache Kafka bağlayıcısını indirin.
Hem bağlayıcı arşivlerinin sıkıştırmasını açın hem de JAR dosyalarını Kafka Bağlan plugin.path adresine kopyalayın.
cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs
Ayrıntılar için lütfen Debezium ve DataStax belgelerine bakın.
Kafka Bağlan yapılandırma ve veri işlem hattını başlatma
Kafka Bağlan kümesini başlatma
cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties
PostgreSQL bağlayıcı örneğini başlatma
Bağlayıcı yapılandırmasını (JSON) bir dosya örneğine kaydetme pg-source-config.json
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.include.list": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
PostgreSQL bağlayıcı örneğini başlatmak için:
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors
Not
Silmek için şunları kullanabilirsiniz: curl -X DELETE http://localhost:8083/connectors/pg-orders-source
Veri ekleme
Tabloda orders_info
sipariş kimliği, müşteri kimliği, şehir gibi sipariş ayrıntıları yer alır. Aşağıdaki SQL'i kullanarak tabloyu rastgele verilerle doldurun.
insert into retail.orders_info (
custid, amount, city, purchase_time
)
select
random() * 10000 + 1,
random() * 200,
('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);
Tabloya 10 kayıt eklemelidir. Aşağıdaki kayıt generate_series(1, 10)
sayısını gereksinimlerinize göre güncelleştirin örneğin, kayıt eklemek 100
için generate_series(1, 100)
Onaylamak için:
select * from retail.orders_info;
Kafka konusunda veri yakalama olaylarını değiştirme konusunu denetleyin
cd <KAFKA_HOME>/bin
./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning
Veri değişiklik olaylarını JSON biçiminde görmeniz gerekir.
DataStax Apache Kafka bağlayıcı örneğini başlatma
Bağlayıcı yapılandırmasını (JSON) bir dosya örneğine cassandra-sink-config.json
kaydedin ve ortamınıza göre özellikleri güncelleştirin.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "myserver.retail.orders_info",
"contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
"loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
"ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
"port": 10350,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"auth.username": "<Azure Cosmos DB user name (same as account name)>",
"auth.password": "<Azure Cosmos DB password>",
"topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"offset.flush.interval.ms": 10000
}
}
Bağlayıcı örneğini başlatmak için:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
Bağlayıcı harekete geçmelidir ve PostgreSQL'den Azure Cosmos DB'ye kadar uçtan uca işlem hattı çalışır durumda olacaktır.
Azure Cosmos DB'yi sorgulama
Azure Cosmos DB'de Cassandra tablolarını denetleyin. Deneyebileceğiniz sorgulardan bazıları şunlardır:
select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;
select * from retail.orders_by_customer;
select * from retail.orders_by_city;
select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;
PostgreSQL'e daha fazla veri eklemeye devam edebilir ve kayıtların Azure Cosmos DB ile eşitlendiğini onaylayabilirsiniz.
Sonraki adımlar
- Kafka Bağlan kullanarak Apache Cassandra için Apache Kafka ve Azure Cosmos DB'yi tümleştirme
- Azure Event Hubs(Önizleme) üzerinde Apache Kafka Bağlan Değişiklik Verileri Yakalama için Debezium ile tümleştirme
- Arcion kullanarak Oracle'dan Apache Cassandra için Azure Cosmos DB'ye veri geçirme
- Kapsayıcı ve veritabanlarına aktarım hızı sağlama
- Bölüm anahtarı en iyi yöntemleri
- Azure Cosmos DB kapasite planlayıcısı makalelerini kullanarak RU/sn tahmini