Migrowanie danych z bazy danych PostgreSQL do usługi Azure Cosmos DB dla konta apache Cassandra przy użyciu platformy Apache Kafka

DOTYCZY: Cassandra

Interfejs API dla bazy danych Cassandra w usłudze Azure Cosmos DB stał się doskonałym wyborem dla obciążeń przedsiębiorstwa działających na platformie Apache Cassandra z różnych powodów, takich jak:

  • Znaczne oszczędności kosztów: możesz zaoszczędzić koszty dzięki usłudze Azure Cosmos DB, która obejmuje koszty maszyn wirtualnych, przepustowości i wszelkich odpowiednich licencji Oracle. Ponadto nie trzeba zarządzać centrami danych, serwerami, magazynem SSD, siecią i kosztami energii elektrycznej.

  • Lepsza skalowalność i dostępność: eliminuje pojedyncze punkty awarii, lepszą skalowalność i dostępność aplikacji.

  • Brak obciążeń związanych z zarządzaniem i monitorowaniem: jako w pełni zarządzana usługa w chmurze usługa Azure Cosmos DB eliminuje koszty związane z zarządzaniem i monitorowaniem niezliczonych ustawień.

Platforma Kafka Połączenie to platforma do strumieniowego przesyłania danych między platformą Apache Kafka a innymi systemami w sposób skalowalny i niezawodny. Obsługuje kilka gotowych łączników, co oznacza, że nie potrzebujesz niestandardowego kodu w celu zintegrowania systemów zewnętrznych z platformą Apache Kafka.

W tym artykule pokazano, jak używać kombinacji łączników platformy Kafka do konfigurowania potoku danych w celu ciągłego synchronizowania rekordów z relacyjnej bazy danych, takiej jak PostgreSQL do usługi Azure Cosmos DB dla usługi Apache Cassandra.

Omówienie

Poniżej przedstawiono ogólne omówienie kompleksowego przepływu przedstawionego w tym artykule.

Dane w tabeli PostgreSQL zostaną wypchnięte do platformy Apache Kafka przy użyciu łącznika Debezium PostgreSQL, który jest łącznikiem źródłowym platformy Kafka Połączenie. Operacje wstawiania, aktualizacji lub usuwania rekordów w tabeli PostgreSQL zostaną przechwycone jako change data zdarzenia i wysłane do tematów platformy Kafka. Łącznik platformy Apache Kafka DataStax (łącznik ujścia platformy Kafka Połączenie) stanowi drugą część potoku. Zsynchronizuj zdarzenia zmian danych z tematu platformy Kafka do usługi Azure Cosmos DB dla tabel apache Cassandra.

Uwaga

Korzystanie z określonych funkcji łącznika platformy Apache Kafka dataStax umożliwia wypychanie danych do wielu tabel. W tym przykładzie łącznik pomoże nam utrwalać rekordy danych zmian w dwóch tabelach Cassandra, które mogą obsługiwać różne wymagania dotyczące zapytań.

Wymagania wstępne

Konfiguracja podstawowa

Skonfiguruj bazę danych PostgreSQL, jeśli jeszcze tego nie zrobiono.

Może to być istniejąca lokalna baza danych lub można ją pobrać i zainstalować na komputerze lokalnym. Można również użyć kontenera platformy Docker.

Uwaga

Poniższy przykład ściąga publiczny obraz kontenera z usługi Docker Hub. Zalecamy najpierw uwierzytelnienie przy użyciu konta usługi Docker Hub (docker login) zamiast tworzenia anonimowego żądania ściągnięcia. Aby zwiększyć niezawodność podczas korzystania z zawartości publicznej, zaimportuj obraz i zarządzaj nim w prywatnym rejestrze kontenerów platformy Azure. Dowiedz się więcej o pracy z obrazami publicznymi.

Aby uruchomić kontener:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

Połączenie do wystąpienia bazy danych PostgreSQL przy użyciu psql klienta:

psql -h localhost -p 5432 -U postgres -W -d postgres

Utwórz tabelę do przechowywania przykładowych informacji o zamówieniu:

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

Za pomocą witryny Azure Portal utwórz przestrzeń kluczy Cassandra i tabele wymagane dla aplikacji demonstracyjnej.

Uwaga

Użyj tej samej przestrzeni kluczy i nazw tabel, co poniżej

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Konfigurowanie platformy Apache Kafka

W tym artykule jest używany klaster lokalny, ale możesz wybrać dowolną inną opcję. Pobierz platformę Kafka, rozpakuj ją, uruchom klaster Zookeeper i Kafka.

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

Konfigurowanie łączników

Zainstaluj łącznik Debezium PostgreSQL i DataStax Apache Kafka. Pobierz archiwum wtyczek łącznika Debezium PostgreSQL. Aby na przykład pobrać wersję 1.3.0 łącznika (najnowszą wersję w momencie pisania), użyj tego linku. Pobierz łącznik Platformy Apache Kafka DataStax z tego linku.

Rozpakuj zarówno archiwa łącznika, jak i skopiuj pliki JAR do pliku Kafka Połączenie plugin.path.

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

Aby uzyskać szczegółowe informacje, zapoznaj się z dokumentacją debezium i datastax .

Konfigurowanie Połączenie platformy Kafka i uruchamianie potoku danych

Uruchamianie klastra platformy Kafka Połączenie

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

Uruchamianie wystąpienia łącznika PostgreSQL

Zapisywanie konfiguracji łącznika (JSON) w przykładzie pliku pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Aby uruchomić wystąpienie łącznika PostgreSQL:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

Uwaga

Aby usunąć, możesz użyć: curl -X DELETE http://localhost:8083/connectors/pg-orders-source

Wstawianie danych

Tabela orders_info zawiera szczegóły zamówienia, takie jak identyfikator zamówienia, identyfikator klienta, miasto itp. Wypełnij tabelę losowymi danymi przy użyciu poniższego kodu SQL.

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

Powinna wstawić do tabeli 10 rekordów. Pamiętaj, aby zaktualizować liczbę rekordów poniżej generate_series(1, 10) , zgodnie z przykładem wymagań, aby wstawić 100 rekordy, użyj polecenia generate_series(1, 100)

Aby potwierdzić:

select * from retail.orders_info;

Sprawdzanie zdarzeń przechwytywania zmian danych w temacie platformy Kafka

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

Zdarzenia zmian danych powinny być widoczne w formacie JSON.

Uruchamianie wystąpienia łącznika Apache Kafka DataStax

Zapisz konfigurację łącznika (JSON) w przykładzie cassandra-sink-config.json pliku i zaktualizuj właściwości zgodnie ze swoim środowiskiem.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

Aby uruchomić wystąpienie łącznika:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Łącznik powinien działać, a potok końcowy z bazy danych PostgreSQL do usługi Azure Cosmos DB będzie działać.

Wykonywanie zapytań do bazy danych Azure Cosmos DB

Sprawdź tabele Cassandra w usłudze Azure Cosmos DB. Oto niektóre zapytania, które można wypróbować:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

Możesz nadal wstawić więcej danych do bazy danych PostgreSQL i potwierdzić, że rekordy są synchronizowane z usługą Azure Cosmos DB.

Następne kroki