Apache Kafka kullanarak PostgreSQL'den Apache Cassandra için Azure Cosmos DB hesabına veri geçirme

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Azure Cosmos DB'de Cassandra API'si, Apache Cassandra üzerinde çalışan kurumsal iş yükleri için aşağıdakiler gibi çeşitli nedenlerle harika bir seçim haline gelmiştir:

  • Önemli maliyet tasarrufu: VM'lerin, bant genişliğinin ve geçerli Oracle lisanslarının maliyetini içeren Azure Cosmos DB ile maliyet tasarrufu yapabilirsiniz. Ayrıca veri merkezlerini, sunucuları, SSD depolamayı, ağ ve elektrik maliyetlerini yönetmeniz gerekmez.

  • Daha iyi ölçeklenebilirlik ve kullanılabilirlik: Uygulamalarınız için tek hata noktalarını, daha iyi ölçeklenebilirliği ve kullanılabilirliği ortadan kaldırır.

  • Yönetme ve izleme yükü yoktur: Azure Cosmos DB, tam olarak yönetilen bir bulut hizmeti olarak çok sayıda ayarı yönetme ve izleme yükünü ortadan kaldırır.

Kafka Bağlan, Apache Kafka ile diğer sistemler arasında ölçeklenebilir ve güvenilir bir şekilde veri akışı yapılan bir platformdur. Birkaç raf bağlayıcısını destekler; bu da dış sistemleri Apache Kafka ile tümleştirmek için özel koda ihtiyacınız olmadığı anlamına gelir.

Bu makalede, PostgreSQLgibi bir ilişkisel veritabanındaki kayıtları Apache Cassandra için Azure Cosmos DB'ye sürekli olarak eşitlemek üzere bir veri işlem hattı ayarlamak için Kafka bağlayıcılarının bir bileşiminin nasıl kullanılacağı gösterilmektedir.

Genel bakış

Bu makalede sunulan uçtan uca akışa üst düzey genel bakış aşağıda verilmiştir.

PostgreSQL tablosundaki veriler, Kafka Bağlan kaynak bağlayıcısı olan Debezium PostgreSQL bağlayıcısı kullanılarak Apache Kafka'ya gönderilir. PostgreSQL tablosundaki kayıtlara ekleme, güncelleştirme veya silme işlemleri olay olarak change data yakalanır ve Kafka konularına gönderilir. DataStax Apache Kafka bağlayıcısı (Kafka Bağlan havuz bağlayıcısı), işlem hattının ikinci bölümünü oluşturur. Kafka konu başlığındaki değişiklik veri olaylarını Apache Cassandra için Azure Cosmos DB tablolarına eşitler.

Not

DataStax Apache Kafka bağlayıcısının belirli özelliklerini kullanmak, verileri birden çok tabloya göndermemizi sağlar. Bu örnekte bağlayıcı, değişiklik veri kayıtlarını farklı sorgu gereksinimlerini destekleyebilecek iki Cassandra tablosuna kalıcı hale getirmemize yardımcı olacaktır.

Önkoşullar

Temel kurulum

Henüz yapmadıysanız PostgreSQL veritabanını ayarlayın.

Bu mevcut bir şirket içi veritabanı olabilir veya yerel makinenize indirip yükleyebilirsiniz. Docker kapsayıcısı da kullanılabilir.

Not

Aşağıdaki örnek, Docker Hub'dan bir genel kapsayıcı görüntüsü çeker. Anonim çekme isteğinde bulunmak yerine önce Docker Hub hesabınızla (docker login) kimlik doğrulaması yapmanızı öneririz. Genel içerik kullanırken güvenilirliği artırmak için görüntüyü özel bir Azure kapsayıcı kayıt defterinde içeri aktarın ve yönetin. Genel görüntülerle çalışma hakkında daha fazla bilgi edinin.

Kapsayıcıyı başlatmak için:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

İstemci kullanarak psql PostgreSQL örneğine Bağlan:

psql -h localhost -p 5432 -U postgres -W -d postgres

Örnek sipariş bilgilerini depolamak için bir tablo oluşturun:

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

Azure portalını kullanarak Cassandra Keyspace'i ve tanıtım uygulaması için gereken tabloları oluşturun.

Not

Aşağıdakiyle aynı Keyspace ve tablo adlarını kullanın

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Apache Kafka kurulumu

Bu makalede yerel küme kullanılır, ancak başka bir seçenek belirleyebilirsiniz. Kafka'yı indirin, sıkıştırmasını açın, Zookeeper ve Kafka kümesini başlatın.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

Bağlayıcıları ayarlama

Debezium PostgreSQL ve DataStax Apache Kafka bağlayıcısını yükleyin. Debezium PostgreSQL bağlayıcısı eklenti arşivini indirin. Örneğin, bağlayıcının 1.3.0 sürümünü indirmek için (yazma sırasında en son), bu bağlantıyı kullanın. Bu bağlantıdan DataStax Apache Kafka bağlayıcısını indirin.

Hem bağlayıcı arşivlerinin sıkıştırmasını açın hem de JAR dosyalarını Kafka Bağlan plugin.path adresine kopyalayın.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

Ayrıntılar için lütfen Debezium ve DataStax belgelerine bakın.

Kafka Bağlan yapılandırma ve veri işlem hattını başlatma

Kafka Bağlan kümesini başlatma

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

PostgreSQL bağlayıcı örneğini başlatma

Bağlayıcı yapılandırmasını (JSON) bir dosya örneğine kaydetme pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

PostgreSQL bağlayıcı örneğini başlatmak için:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

Not

Silmek için şunları kullanabilirsiniz: curl -X DELETE http://localhost:8083/connectors/pg-orders-source

Veri ekleme

Tabloda orders_info sipariş kimliği, müşteri kimliği, şehir gibi sipariş ayrıntıları yer alır. Aşağıdaki SQL'i kullanarak tabloyu rastgele verilerle doldurun.

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

Tabloya 10 kayıt eklemelidir. Aşağıdaki kayıt generate_series(1, 10) sayısını gereksinimlerinize göre güncelleştirin örneğin, kayıt eklemek 100 için generate_series(1, 100)

Onaylamak için:

select * from retail.orders_info;

Kafka konusunda veri yakalama olaylarını değiştirme konusunu denetleyin

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

Veri değişiklik olaylarını JSON biçiminde görmeniz gerekir.

DataStax Apache Kafka bağlayıcı örneğini başlatma

Bağlayıcı yapılandırmasını (JSON) bir dosya örneğine cassandra-sink-config.json kaydedin ve ortamınıza göre özellikleri güncelleştirin.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

Bağlayıcı örneğini başlatmak için:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Bağlayıcı harekete geçmelidir ve PostgreSQL'den Azure Cosmos DB'ye kadar uçtan uca işlem hattı çalışır durumda olacaktır.

Azure Cosmos DB'yi sorgulama

Azure Cosmos DB'de Cassandra tablolarını denetleyin. Deneyebileceğiniz sorgulardan bazıları şunlardır:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

PostgreSQL'e daha fazla veri eklemeye devam edebilir ve kayıtların Azure Cosmos DB ile eşitlendiğini onaylayabilirsiniz.

Sonraki adımlar