Migrowanie danych z bazy danych PostgreSQL do usługi Azure Cosmos DB dla konta apache Cassandra przy użyciu platformy Apache Kafka
DOTYCZY: Cassandra
Interfejs API dla bazy danych Cassandra w usłudze Azure Cosmos DB stał się doskonałym wyborem dla obciążeń przedsiębiorstwa działających na platformie Apache Cassandra z różnych powodów, takich jak:
Znaczne oszczędności kosztów: możesz zaoszczędzić koszty dzięki usłudze Azure Cosmos DB, która obejmuje koszty maszyn wirtualnych, przepustowości i wszelkich odpowiednich licencji Oracle. Ponadto nie trzeba zarządzać centrami danych, serwerami, magazynem SSD, siecią i kosztami energii elektrycznej.
Lepsza skalowalność i dostępność: eliminuje pojedyncze punkty awarii, lepszą skalowalność i dostępność aplikacji.
Brak obciążeń związanych z zarządzaniem i monitorowaniem: jako w pełni zarządzana usługa w chmurze usługa Azure Cosmos DB eliminuje koszty związane z zarządzaniem i monitorowaniem niezliczonych ustawień.
Platforma Kafka Połączenie to platforma do strumieniowego przesyłania danych między platformą Apache Kafka a innymi systemami w sposób skalowalny i niezawodny. Obsługuje kilka gotowych łączników, co oznacza, że nie potrzebujesz niestandardowego kodu w celu zintegrowania systemów zewnętrznych z platformą Apache Kafka.
W tym artykule pokazano, jak używać kombinacji łączników platformy Kafka do konfigurowania potoku danych w celu ciągłego synchronizowania rekordów z relacyjnej bazy danych, takiej jak PostgreSQL do usługi Azure Cosmos DB dla usługi Apache Cassandra.
Omówienie
Poniżej przedstawiono ogólne omówienie kompleksowego przepływu przedstawionego w tym artykule.
Dane w tabeli PostgreSQL zostaną wypchnięte do platformy Apache Kafka przy użyciu łącznika Debezium PostgreSQL, który jest łącznikiem źródłowym platformy Kafka Połączenie. Operacje wstawiania, aktualizacji lub usuwania rekordów w tabeli PostgreSQL zostaną przechwycone jako change data
zdarzenia i wysłane do tematów platformy Kafka. Łącznik platformy Apache Kafka DataStax (łącznik ujścia platformy Kafka Połączenie) stanowi drugą część potoku. Zsynchronizuj zdarzenia zmian danych z tematu platformy Kafka do usługi Azure Cosmos DB dla tabel apache Cassandra.
Uwaga
Korzystanie z określonych funkcji łącznika platformy Apache Kafka dataStax umożliwia wypychanie danych do wielu tabel. W tym przykładzie łącznik pomoże nam utrwalać rekordy danych zmian w dwóch tabelach Cassandra, które mogą obsługiwać różne wymagania dotyczące zapytań.
Wymagania wstępne
- Aprowizuj konto usługi Azure Cosmos DB dla bazy danych Apache Cassandra
- Sprawdzanie poprawności przy użyciu narzędzia cqlsh
- JDK 8 lub nowszy
- Docker (opcjonalnie)
Konfiguracja podstawowa
Skonfiguruj bazę danych PostgreSQL, jeśli jeszcze tego nie zrobiono.
Może to być istniejąca lokalna baza danych lub można ją pobrać i zainstalować na komputerze lokalnym. Można również użyć kontenera platformy Docker.
Uwaga
Poniższy przykład ściąga publiczny obraz kontenera z usługi Docker Hub. Zalecamy najpierw uwierzytelnienie przy użyciu konta usługi Docker Hub (docker login
) zamiast tworzenia anonimowego żądania ściągnięcia. Aby zwiększyć niezawodność podczas korzystania z zawartości publicznej, zaimportuj obraz i zarządzaj nim w prywatnym rejestrze kontenerów platformy Azure. Dowiedz się więcej o pracy z obrazami publicznymi.
Aby uruchomić kontener:
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres
Połączenie do wystąpienia bazy danych PostgreSQL przy użyciu psql
klienta:
psql -h localhost -p 5432 -U postgres -W -d postgres
Utwórz tabelę do przechowywania przykładowych informacji o zamówieniu:
CREATE SCHEMA retail;
CREATE TABLE retail.orders_info (
orderid SERIAL NOT NULL PRIMARY KEY,
custid INTEGER NOT NULL,
amount INTEGER NOT NULL,
city VARCHAR(255) NOT NULL,
purchase_time VARCHAR(40) NOT NULL
);
Za pomocą witryny Azure Portal utwórz przestrzeń kluczy Cassandra i tabele wymagane dla aplikacji demonstracyjnej.
Uwaga
Użyj tej samej przestrzeni kluczy i nazw tabel, co poniżej
CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Konfigurowanie platformy Apache Kafka
W tym artykule jest używany klaster lokalny, ale możesz wybrać dowolną inną opcję. Pobierz platformę Kafka, rozpakuj ją, uruchom klaster Zookeeper i Kafka.
cd <KAFKA_HOME>/bin
#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties
Konfigurowanie łączników
Zainstaluj łącznik Debezium PostgreSQL i DataStax Apache Kafka. Pobierz archiwum wtyczek łącznika Debezium PostgreSQL. Aby na przykład pobrać wersję 1.3.0 łącznika (najnowszą wersję w momencie pisania), użyj tego linku. Pobierz łącznik Platformy Apache Kafka DataStax z tego linku.
Rozpakuj zarówno archiwa łącznika, jak i skopiuj pliki JAR do pliku Kafka Połączenie plugin.path.
cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs
Aby uzyskać szczegółowe informacje, zapoznaj się z dokumentacją debezium i datastax .
Konfigurowanie Połączenie platformy Kafka i uruchamianie potoku danych
Uruchamianie klastra platformy Kafka Połączenie
cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties
Uruchamianie wystąpienia łącznika PostgreSQL
Zapisywanie konfiguracji łącznika (JSON) w przykładzie pliku pg-source-config.json
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.include.list": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Aby uruchomić wystąpienie łącznika PostgreSQL:
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors
Uwaga
Aby usunąć, możesz użyć: curl -X DELETE http://localhost:8083/connectors/pg-orders-source
Wstawianie danych
Tabela orders_info
zawiera szczegóły zamówienia, takie jak identyfikator zamówienia, identyfikator klienta, miasto itp. Wypełnij tabelę losowymi danymi przy użyciu poniższego kodu SQL.
insert into retail.orders_info (
custid, amount, city, purchase_time
)
select
random() * 10000 + 1,
random() * 200,
('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);
Powinna wstawić do tabeli 10 rekordów. Pamiętaj, aby zaktualizować liczbę rekordów poniżej generate_series(1, 10)
, zgodnie z przykładem wymagań, aby wstawić 100
rekordy, użyj polecenia generate_series(1, 100)
Aby potwierdzić:
select * from retail.orders_info;
Sprawdzanie zdarzeń przechwytywania zmian danych w temacie platformy Kafka
cd <KAFKA_HOME>/bin
./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning
Zdarzenia zmian danych powinny być widoczne w formacie JSON.
Uruchamianie wystąpienia łącznika Apache Kafka DataStax
Zapisz konfigurację łącznika (JSON) w przykładzie cassandra-sink-config.json
pliku i zaktualizuj właściwości zgodnie ze swoim środowiskiem.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "myserver.retail.orders_info",
"contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
"loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
"ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
"port": 10350,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"auth.username": "<Azure Cosmos DB user name (same as account name)>",
"auth.password": "<Azure Cosmos DB password>",
"topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"offset.flush.interval.ms": 10000
}
}
Aby uruchomić wystąpienie łącznika:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
Łącznik powinien działać, a potok końcowy z bazy danych PostgreSQL do usługi Azure Cosmos DB będzie działać.
Wykonywanie zapytań do bazy danych Azure Cosmos DB
Sprawdź tabele Cassandra w usłudze Azure Cosmos DB. Oto niektóre zapytania, które można wypróbować:
select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;
select * from retail.orders_by_customer;
select * from retail.orders_by_city;
select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;
Możesz nadal wstawić więcej danych do bazy danych PostgreSQL i potwierdzić, że rekordy są synchronizowane z usługą Azure Cosmos DB.
Następne kroki
- Integrowanie platformy Apache Kafka i usługi Azure Cosmos DB dla bazy danych Apache Cassandra przy użyciu platformy Kafka Połączenie
- Integrowanie platformy Apache Kafka Połączenie w usłudze Azure Event Hubs (wersja zapoznawcza) z rozwiązaniem Debezium na potrzeby przechwytywania zmian danych
- Migrowanie danych z bazy danych Oracle do usługi Azure Cosmos DB dla systemu Apache Cassandra przy użyciu usługi Arcion
- Aprowizacja przepływności kontenerów i baz danych
- Najlepsze rozwiązania dotyczące klucza partycji
- Szacowanie jednostek RU/s przy użyciu artykułów planisty pojemności usługi Azure Cosmos DB