Esdeveniment
Crear aplicacions i agents d'IA
17 de març, 21 - 21 de març, 10
Uneix-te a la sèrie de trobades per crear solucions d'IA escalables basades en casos d'ús del món real amb altres desenvolupadors i experts.
Registreu-vos-hi araAquest navegador ja no s’admet.
Feu l’actualització al Microsoft Edge per aprofitar les característiques més recents, les actualitzacions de seguretat i l’assistència tècnica.
SE APLICA A: Cassandra
API para Cassandra en Azure Cosmos DB se ha convertido en una excelente opción para cargas de trabajo empresariales que se ejecutan en Apache Cassandra por varios motivos como, por ejemplo:
Importante ahorro de costos: puede ahorrar costos con Azure Cosmos DB, como en el costo de las máquinas virtuales, del ancho de banda y de las licencias de Oracle aplicables. Además, no tiene que administrar los centros de datos, los servidores, el almacenamiento en SSD, las redes y los costos de electricidad.
Mejor escalabilidad y disponibilidad: Evita los puntos únicos de error, lo que supone una mejor escalabilidad y disponibilidad para las aplicaciones.
No hay sobrecarga de administración y supervisión: como servicio en la nube totalmente administrado, Azure Cosmos DB evita la sobrecarga de administrar y supervisar innumerables configuraciones.
Kafka Connect es una plataforma para transmitir datos entre Apache Kafka y otros sistemas de forma escalable y confiable. Admite varios conectores comerciales, lo que significa que no necesita código personalizado para integrar sistemas externos con Apache Kafka.
En este artículo se muestra cómo usar una combinación de conectores de Kafka para configurar una canalización de datos para sincronizar continuamente registros desde una base de datos relacional como PostgreSQL en Azure Cosmos DB for Apache Cassandra.
Aquí encontrará información general de alto nivel sobre el flujo completo que se presenta en este artículo.
Los datos de la tabla de PostgreSQL se insertarán en Apache Kafka mediante el conector de Debezium PostgreSQL, que es un conector de origen de Kafka Connect. Las inserciones, actualizaciones o eliminaciones en los registros de la tabla de PostgreSQL se capturarán como eventos change data
y se enviarán a los temas de Kafka. El conector de Apache Kafka de DataStax (conector del receptor de Kafka Connect) forma la segunda parte de la canalización. Sincronizará los eventos de datos modificados del tema Kafka con las tablas de Azure Cosmos DB for Apache Cassandra.
Nota
El uso de características específicas del conector de Apache Kafka de DataStax permite insertar datos en varias tablas. En este ejemplo, el conector nos ayudará a conservar los registros de datos modificados en dos tablas de Cassandra que pueden admitir diferentes requisitos de consulta.
Podría tratarse de una base de datos local existente o podría descargar e instalar una en la máquina local. También es posible usar un contenedor de Docker.
Nota
En el ejemplo siguiente se extrae una imagen de contenedor público de Docker Hub. Se recomienda autenticarse primero con la cuenta de Docker Hub (docker login
) en lugar de realizar una solicitud de extracción anónima. Para mejorar la confiabilidad al usar contenido público, importe y administre la imagen en un registro de contenedor privado de Azure. Más información sobre cómo trabajar con imágenes públicas.
Para iniciar un contenedor:
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres
Conéctese a la instancia de PostgreSQL con el cliente psql
:
psql -h localhost -p 5432 -U postgres -W -d postgres
Cree una tabla para almacenar información de pedidos de ejemplo:
CREATE SCHEMA retail;
CREATE TABLE retail.orders_info (
orderid SERIAL NOT NULL PRIMARY KEY,
custid INTEGER NOT NULL,
amount INTEGER NOT NULL,
city VARCHAR(255) NOT NULL,
purchase_time VARCHAR(40) NOT NULL
);
Nota
Use los mismos nombres de espacio de claves y tabla que se indican a continuación.
CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
En este artículo se usa un clúster local, pero puede elegir cualquier otra opción. Descargue Kafka, descomprímalo e inicie el clúster de Zookeeper y Kafka.
cd <KAFKA_HOME>/bin
#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties
Instale el conector de Apache Kafka de DataStax y de PostgreSQL de Debezium. Descargue el archivo del complemento del conector de PostgreSQL de Debezium. Por ejemplo, para descargar la versión 1.3.0 del conector (la más reciente en el momento de redactar este artículo), use este vínculo. Descargue el conector de Apache Kafka de DataStax desde este vínculo.
Descomprima los archivos del conector y copie los archivos JAR en plugin.path de Kafka Connect.
cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs
Para obtener más información, consulte la documentación de Debezium y DataStax.
cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties
Guarde la configuración del conector (JSON) en un ejemplo de archivo pg-source-config.json
.
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.include.list": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Para iniciar la instancia del conector de PostgreSQL:
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors
Nota
Para eliminarla, puede usar: curl -X DELETE http://localhost:8083/connectors/pg-orders-source
La tabla orders_info
contiene detalles de pedidos como el identificador de pedido, el identificador de cliente, la ciudad, etc. Rellene la tabla con datos aleatorios mediante los siguientes datos SQL.
insert into retail.orders_info (
custid, amount, city, purchase_time
)
select
random() * 10000 + 1,
random() * 200,
('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);
Debe insertar 10 registros en la tabla. Asegúrese de actualizar el número de registros en generate_series(1, 10)
a continuación, según el ejemplo de requisitos; para insertar 100
registros, use generate_series(1, 100)
.
Para confirmar:
select * from retail.orders_info;
Compruebe los eventos de la captura de datos modificados en el tema de Kafka.
cd <KAFKA_HOME>/bin
./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning
Debería ver los eventos de datos modificados en formato JSON.
Guarde la configuración del conector (JSON) en un ejemplo de archivo, cassandra-sink-config.json
, y actualice las propiedades según su entorno.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "myserver.retail.orders_info",
"contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
"loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
"ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
"port": 10350,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"auth.username": "<Azure Cosmos DB user name (same as account name)>",
"auth.password": "<Azure Cosmos DB password>",
"topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"offset.flush.interval.ms": 10000
}
}
Para iniciar la instancia del conector:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
El conector debe entrar en acción, y la canalización completa de PostgreSQL a Azure Cosmos DB estará operativa.
Compruebe las tablas de Cassandra en Azure Cosmos DB. A continuación, se indican algunas de las consultas que puede probar:
select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;
select * from retail.orders_by_customer;
select * from retail.orders_by_city;
select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;
Puede continuar insertando más datos en PostgreSQL y confirmar que los registros están sincronizados con Azure Cosmos DB.
Esdeveniment
Crear aplicacions i agents d'IA
17 de març, 21 - 21 de març, 10
Uneix-te a la sèrie de trobades per crear solucions d'IA escalables basades en casos d'ús del món real amb altres desenvolupadors i experts.
Registreu-vos-hi araFormació
Mòdul
Traslado de datos a Azure Cosmos DB for NoSQL y desde este - Training
Migre datos dentro y fuera de Azure Cosmos DB for NoSQL mediante servicios de Azure y soluciones de código abierto.
Certificació
Microsoft Certified: Azure Cosmos DB Developer Specialty - Certifications
Escribe consultas eficaces, crea directivas de indexación, administra y aprovisiona recursos en la API de SQL y el SDK con Microsoft Azure Cosmos DB.
Documentació
Introducción - Azure Cosmos DB for PostgreSQL
Use Azure Cosmos DB for PostgreSQL para ejecutar las cargas de trabajo de datos relacionales de PostgreSQL a cualquier escala mediante sus aptitudes existentes.
Documentación de Azure Cosmos DB for PostgreSQL
Azure Cosmos DB for PostgreSQL es un servicio administrado para PostgreSQL ampliado con la superpotencia del código abierto de Citus de *tablas distribuidas*.
Inicio rápido: Creación de un clúster de Azure Cosmos DB for PostgreSQL
Inicio rápido para crear y consultar tablas distribuidas en Azure Cosmos DB for PostgreSQL.