Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
GILT FÜR: Cassandra
Die API für Cassandra in Azure Cosmos DB hat sich aus einer Reihe unterschiedlicher Gründe zu einer guten Wahl für Unternehmensworkloads entwickelt, die unter Apache Cassandra ausgeführt werden:
Erhebliche Kosteneinsparungen: Sie können Kosten sparen mit Azure Cosmos DB, einschließlich der Kosten für virtuelle Computer, Bandbreite und alle anwendbaren Oracle-Lizenzen. Außerdem entfällt die Verwaltung der Kosten für Rechenzentren, Server, SSD-Speicher, Netzwerk und Strom.
Bessere Skalierbarkeit und Verfügbarkeit: Beseitigt einzelne Fehlerquellen, verbessert die Skalierbarkeit und Verfügbarkeit für Ihre Anwendungen.
Kein Mehraufwand für Verwaltung und Überwachung: Als vollständig verwalteter Clouddienst entfernt Azure Cosmos DB den Mehraufwand der Verwaltung und Überwachung von unzähligen Einstellungen.
Kafka Connect ist eine Plattform zum Streamen von Daten zwischen Apache Kafka und anderen Systemen auf skalierbare und zuverlässige Weise. Sie unterstützt mehrere einsatzbereite Connectors, sodass Sie keinen benutzerdefinierten Code benötigen, um externe Systeme in Apache Kafka zu integrieren.
In diesem Artikel wird veranschaulicht, wie Sie eine Kombination aus Kafka-Connectors verwenden, um eine Datenpipeline zum fortlaufenden Synchronisieren von Datensätzen aus einer relationalen Datenbank wie PostgreSQL mit Azure Cosmos DB for Apache Cassandra zu verwenden.
Übersicht
Hier finden Sie eine allgemeine Übersicht über den gesamten Flow, der in diesem Artikel vorgestellt wird.
Die Daten in der PostgreSQL-Tabelle werden mithilfe des Debezium PostgreSQL-Connectors an Apache Kafka gepusht. Hierbei handelt es sich um einen Quellconnector für Kafka Connect. Einfüge-, Aktualisierungs- oder Löschvorgänge mit Datensätzen in der PostgreSQL-Tabelle werden als change data
-Ereignisse erfasst und an Kafka-Themen gesendet. Der DataStax Apache Kafka-Connector (in Kafka Connect ein Senkenconnector) bildet den zweiten Teil der Pipeline. Er synchronisiert die Änderungsereignisse von Daten aus Kafka-Themen mit Azure Cosmos DB for Apache Cassandra-Tabellen.
Hinweis
Durch die Verwendung spezifischer Features des DataStax Apache Kafka-Connectors können Sie Daten in mehrere Tabellen pushen. In diesem Beispiel hilft der Connector dabei, Datensätze zu Datenänderungen in zwei Cassandra-Tabellen zu speichern, die unterschiedliche Anforderungen an Abfragen unterstützen.
Voraussetzungen
- Stellen Sie ein Azure Cosmos DB for Apache Cassandra-Konto bereit.
- Verwenden Sie cqlsh für die Überprufung
- JDK 8 oder höher
- Docker (optional)
Basiseinrichtung
Richten Sie die PostgreSQL-Datenbank ein, falls dies noch nicht geschehen ist.
Sie können eine lokale Datenbank verwenden, oder Sie können eine auf Ihren lokalen Computer herunterladen und installieren. Es ist auch möglich, einen Docker-Container zu verwenden.
Hinweis
Im folgenden Beispiel wird ein öffentliches Container-Image von Docker Hub abgerufen. Es wird empfohlen, dass Sie sich zuerst mit Ihrem Docker Hub Konto (docker login
) authentifizieren, anstatt eine anonyme Abrufanforderung zu erstellen. Um die Zuverlässigkeit bei der Verwendung öffentlicher Inhalte zu verbessern, sollten Sie das Image in einer privaten Azure-Containerregistrierung importieren und verwalten. Weitere Informationen zur Verwendung öffentlicher Images finden Sie hier.
So starten Sie einen Container
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres
Stellen Sie mithilfe des psql
-Clients eine Verbindung mit Ihrer PostgreSQL-Instanz her:
psql -h localhost -p 5432 -U postgres -W -d postgres
Erstellen Sie eine Tabelle zum Speichern von Daten zu Beispielbestellungen:
CREATE SCHEMA retail;
CREATE TABLE retail.orders_info (
orderid SERIAL NOT NULL PRIMARY KEY,
custid INTEGER NOT NULL,
amount INTEGER NOT NULL,
city VARCHAR(255) NOT NULL,
purchase_time VARCHAR(40) NOT NULL
);
Erstellen Sie mithilfe des Azure-Portals den Cassandra-Keyspace und die Tabellen, die für die Demoanwendung erforderlich sind.
Hinweis
Verwenden Sie die gleichen Keyspace- und Tabellennamen wie nachfolgend angegeben.
CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Einrichten von Apache Kafka
In diesem Artikel wird ein lokaler Cluster verwendet, Sie können aber auch eine beliebige andere Option verwenden. Laden Sie Kafka herunter, entzippen Sie das Paket, und starten Sie ZooKeeper und den Kafka-Cluster.
cd <KAFKA_HOME>/bin
#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties
Einrichten der Connectors
Installieren Sie den Debezium PostgreSQL- und den DataStax Apache Kafka-Connector. Laden Sie das Plug-In-Archiv für den Debezium PostgreSQL-Connector herunter. Wenn Sie z. B. Version 1.3.0 des Connectors herunterladen möchten (zum Zeitpunkt des Verfassens die neueste Version), verwenden Sie diesen Link. Laden Sie den DataStax Apache Kafka-Connector unter diesem Link herunter.
Entzippen Sie beide Connectorarchive, und kopieren Sie die JAR-Dateien in den Plug-In-Pfad von Kafka Connect.
cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs
Weitere Informationen finden Sie in der Dokumentation zu Debezium und DataStax.
Konfigurieren von Kafka Connect und Starten der Datenpipeline
Starten des Kafka Connect-Clusters
cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties
Starten der PostgreSQL-Connectorinstanz
Speichern der Connectorkonfiguration (JSON) in einer Datei, z. B. pg-source-config.json
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.include.list": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
So starten Sie die PostgreSQL-Connectorinstanz
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors
Hinweis
Für das Löschen verwenden Sie: curl -X DELETE http://localhost:8083/connectors/pg-orders-source
Einfügen von Daten
Die Tabelle orders_info
enthält Details zur Bestellung, z. B. Bestell-ID, Kunden-ID, Ort usw. Füllen Sie die Tabelle mit zufälligen Daten auf, indem Sie folgenden SQL-Code verwenden.
insert into retail.orders_info (
custid, amount, city, purchase_time
)
select
random() * 10000 + 1,
random() * 200,
('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);
Damit sollten 10 Datensätze in die Tabelle eingefügt werden. Achten Sie darauf, dass die Anzahl der Datensätze in generate_series(1, 10)
unten Ihrem Anforderungsbeispiel entspricht. Verwenden Sie zum Einfügen von 100
Datensätzen generate_series(1, 100)
So bestätigen Sie den Vorgang
select * from retail.orders_info;
Überprüfen der Change Data Capture-Ereignisse im Kafka-Thema
cd <KAFKA_HOME>/bin
./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning
Die Änderungsereignisse zu den Daten sollten im JSON-Format angezeigt werden.
Starten der DataStax Apache Kafka-Connectorinstanz
Speichern Sie die Connectorkonfiguration (JSON) in einer Datei (z. B. cassandra-sink-config.json
), und passen Sie die Eigenschaften an Ihre Umgebung an.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "myserver.retail.orders_info",
"contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
"loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
"ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
"port": 10350,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"auth.username": "<Azure Cosmos DB user name (same as account name)>",
"auth.password": "<Azure Cosmos DB password>",
"topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"offset.flush.interval.ms": 10000
}
}
So starten Sie die Connectorinstanz
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
Der Connector sollte aktiv werden. Damit sollte auch die gesamte Pipeline von PostgreSQL zu Azure Cosmos DB betriebsbereit sein.
Abfragen einer Azure Cosmos DB
Überprüfen Sie die Cassandra-Tabellen in Azure Cosmos DB. Hier sind einige Abfragen, die Sie ausprobieren können:
select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;
select * from retail.orders_by_customer;
select * from retail.orders_by_city;
select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;
Sie können noch weitere Daten in PostgreSQL einfügen und überprüfen, ob die Datensätze mit Azure Cosmos DB synchronisiert werden.
Nächste Schritte
- Integrieren von Apache Kafka und Azure Cosmos DB for Apache Cassandra mithilfe von Kafka Connect
- Integrieren von Apache Kafka Connect in Azure Event Hubs (Vorschau) mit Debezium für Change Data Capture
- Migrieren von Daten aus Oracle in Azure Cosmos DB for Apache Cassandra mithilfe von Arcion
- Bereitstellen des Durchsatzes für Container und Datenbanken
- Artikel zum Schätzen des Durchsatzes (RU/s) mit dem Azure Cosmos DB Capacity Planner