Apache Kafka를 사용하여 PostgreSQL에서 Azure Cosmos DB for Apache Cassandra 계정으로 데이터 마이그레이션

적용 대상: Cassandra

Azure Cosmos DB에서 API for Cassandra는 다음과 같은 다양한 이유로 Apache Cassandra에서 실행되는 엔터프라이즈 워크로드에 적합한 선택이 되었습니다.

  • 대폭적인 비용 절감: Azure Cosmos DB를 사용하여 비용을 절감할 수 있습니다. 여기에는 VM, 대역폭, 적용 가능한 Oracle 라이선스 비용이 포함됩니다. 또한 데이터 센터, 서버, SSD 스토리지, 네트워킹, 전기 비용을 관리할 필요가 없습니다.

  • 스케일링 수준 및 가용성 향상: 단일 실패 지점을 제거하고 애플리케이션의 스케일링 수준과 가용성을 향상합니다.

  • 관리 및 모니터링의 오버헤드 없음: 완전히 관리되는 클라우드 서비스인 Azure Cosmos DB는 수많은 설정을 관리하고 모니터링하는 오버헤드를 제거합니다.

Kafka 커넥트 확장 가능하고 안정적인 방식으로 Apache Kafka와 다른 시스템 간에 데이터를 스트리밍하는 플랫폼입니다. 몇 가지 상용 커넥터를 지원하므로 외부 시스템을 Apache Kafka와 통합하기 위해 사용자 지정 코드가 필요하지 않습니다.

이 문서에서는 Kafka Connect 조합을 사용하여 PostgreSQL과 같은 관계형 데이터베이스의 레코드를 Azure Cosmos DB for Apache Cassandra로 지속적으로 동기화하도록 데이터 파이프라인을 설정하는 방법을 보여 줍니다.

개요

다음은 이 문서에 제시된 종단 간 흐름에 대한 개략적인 개요입니다.

PostgreSQL 테이블의 데이터는 Kafka Connect 원본 커넥터에 해당하는 Debezium PostgreSQL 커넥터를 사용하여 Apache Kafka에 푸시됩니다. PostgreSQL 테이블의 레코드에 대한 삽입, 업데이트 또는 삭제는 이벤트로 캡처되고 Kafka 토픽으로 change data 전송됩니다. DataStax Apache Kafka 커넥터(kafka Connect 싱크 커넥터)는 파이프라인의 두 번째 부분을 형성합니다. Kafka 항목의 변경 데이터 이벤트를 Azure Cosmos DB for Apache Cassandra 테이블과 동기화합니다.

참고 항목

DataStax Apache Kafka 커넥터의 특정 기능을 사용하면 데이터를 여러 테이블에 푸시할 수 있습니다. 이 예제에서 커넥터는 다른 쿼리 요구 사항을 지원할 수 있는 두 개의 Cassandra 테이블로 변경 데이터 레코드를 유지하는 데 도움이 됩니다.

필수 조건

기본 설정

아직 설치하지 않은 경우 PostgreSQL 데이터베이스를 설정합니다.

이것은 기존 온-프레미스 데이터베이스일 수도 있고 로컬 컴퓨터에 다운로드하여 설치할 수도 있습니다. Docker 컨테이너를 사용할 수도 있습니다.

참고 항목

다음 예제에서는 Docker Hub에서 공용 컨테이너 이미지를 가져옵니다. 익명 끌어오기 요청을 만드는 대신 먼저 Docker Hub 계정(docker login)으로 인증하는 것이 좋습니다. 퍼블릭 콘텐츠를 사용할 때 안정성을 향상하려면 프라이빗 Azure 컨테이너 레지스트리에서 이미지를 가져오고 관리합니다. 공용 이미지 사용에 대해 자세히 알아봅니다.

컨테이너를 시작하려면 다음을 수행합니다.

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

psql 클라이언트를 사용하여 PostgreSQL 인스턴스에 연결합니다.

psql -h localhost -p 5432 -U postgres -W -d postgres

샘플 주문 정보를 저장할 테이블을 만듭니다.

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

Azure Portal을 사용하여 데모 애플리케이션에 필요한 Cassandra Keyspace 및 테이블을 만듭니다.

참고 항목

아래와 동일한 키스페이스 및 테이블 이름 사용

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Apache Kafka 설정

이 문서에서는 로컬 클러스터를 사용하지만 다른 옵션을 선택할 수 있습니다. Kafka를 다운로드하고 압축을 풀고 Zookeeper 및 Kafka 클러스터를 시작합니다.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

커넥터 설정

Debezium PostgreSQL 및 DataStax Apache Kafka 커넥터를 설치합니다. Debezium PostgreSQL 커넥터 플러그 인 보관 파일을 다운로드합니다. 예를 들어, 커넥터 버전 1.3.0(작성 시점의 최신 버전)을 다운로드하려면 이 링크를 사용합니다. 이 링크에서 DataStax Apache Kafka 커넥터를 다운로드합니다.

커넥터 보관 파일의 압축을 풀고 JAR 파일을 Kafka 커넥트 plugin.path에 복사합니다.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

자세한 내용은 DebeziumDataStax 설명서를 참조하세요.

Kafka Connect 구성 및 데이터 파이프라인 시작

Kafka 커넥트 클러스터 시작

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

PostgreSQL 커넥터 인스턴스 시작

파일 예제에 커넥터 구성(JSON)을 저장합니다. pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

PostgreSQL 커넥터 인스턴스를 시작하려면 다음을 수행합니다.

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

참고 항목

curl -X DELETE http://localhost:8083/connectors/pg-orders-source 명령을 사용하여 삭제할 수 있습니다.

데이터 삽입

테이블에는 orders_info 주문 ID, 고객 ID, 도시 등과 같은 주문 세부 정보가 포함되어 있습니다. 아래 SQL을 사용하여 테이블을 임의 데이터로 채웁니다.

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

테이블에 10 레코드를 삽입해야 합니다. 요구 사항 예제에 따라 아래의 generate_series(1, 10)에서 레코드 수를 업데이트하여 100개 레코드를 삽입하려면 generate_series(1, 100)을 사용합니다.

확인:

select * from retail.orders_info;

Kafka 토픽에서 변경 데이터 캡처 이벤트 확인

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

변경 데이터 이벤트가 JSON 형식으로 표시됩니다.

DataStax Apache Kafka 커넥터 인스턴스 시작

JSON(커넥터 구성)을 파일 예제 cassandra-sink-config.json 에 저장하고 사용자 환경에 따라 속성을 업데이트합니다.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

커넥터 인스턴스를 시작하려면 다음을 수행합니다.

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

커넥터가 작동하기 시작하고 PostgreSQL에서 Azure Cosmos DB로의 엔드투엔드 파이프라인이 작동하게 됩니다.

Azure Cosmos DB 쿼리

Azure Cosmos DB에서 Cassandra 테이블을 확인합니다. 다음은 시도할 수 있는 몇 가지 쿼리입니다.

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

계속해서 PostgreSQL에 더 많은 데이터를 삽입하고 레코드가 Azure Cosmos DB에 동기화되는지 확인할 수 있습니다.

다음 단계