Migre en directo datos de Apache Cassandra a Azure Cosmos DB for Apache Cassandra mediante el proxy de doble escritura y Apache Spark
La API de Cassandra en Azure Cosmos DB se ha convertido en una excelente opción para cargas de trabajo empresariales que se ejecutan en Apache Cassandra por diversos motivos como, por ejemplo:
No hay sobrecarga de administración y supervisión: evita la sobrecarga que supone administrar y supervisar innumerables configuraciones en el sistema operativo, JVM, los archivos yaml y sus interacciones.
Importante ahorro de costos: puede ahorrar costos con Azure Cosmos DB, como en el costo de las máquinas virtuales, del ancho de banda y de las licencias aplicables. Además, no tiene que administrar los centros de datos, los servidores, el almacenamiento en SSD, las redes y los costos de electricidad.
Posibilidad de usar código y herramientas existentes: Azure Cosmos DB proporciona compatibilidad de nivel de protocolo de conexión con SDK y herramientas existentes de Cassandra. Esta compatibilidad garantiza que pueda usar el código base existente con Azure Cosmos DB for Apache Cassandra con cambios triviales.
Azure Cosmos DB no es compatible con el protocolo de cotilleo nativo de Apache Cassandra para la replicación. Por lo tanto, cuando el tiempo de inactividad cero es un requisito para la migración, es necesario un enfoque diferente. En este tutorial se describe cómo migrar en directo datos a Azure Cosmos DB for Apache Cassandra desde un clúster nativo de Apache Cassandra mediante un proxy de doble escritura y Apache Spark.
En la siguiente imagen, se ilustra este patrón. El proxy de doble escritura se usa para capturar cambios en directo, mientras que los datos históricos se copian en masa con Apache Spark. El proxy puede aceptar conexiones desde el código de las aplicaciones con pocos o ningún cambio de configuración. Enrutará todas las solicitudes a la base de datos de origen y dirigirá de forma asincrónica las escrituras a la API de Cassandra mientras se produce una copia masiva.
Prerrequisitos
Aprovisionamiento de una cuenta de Azure Cosmos DB para Apache Cassandra.
Consulte las características compatibles en Apache Cassandra de Azure Cosmos DB para garantizar la compatibilidad.
Asegúrese de que tiene conectividad de red entre el clúster de origen y el punto de conexión de la API de Cassandra de destino.
Asegúrese de haber migrado ya el espacio de claves o el esquema de tabla de la base de datos Cassandra de origen a la cuenta de la API de Cassandra de destino.
Importante
Si tiene un requisito para conservar Apache Cassandra
writetime
durante la migración, se deben establecer las siguientes marcas al crear tablas:with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true
Por ejemplo:
CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users ( name text, userID int, address text, phone int, PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;
Aprovisionamiento de un clúster de Spark
Se recomienda Azure Databricks. Use un runtime compatible con Spark 3.0 o posterior.
Importante
Debe asegurarse de que su cuenta de Azure Databricks tiene conectividad de red con el clúster Apache Cassandra de origen. Esto puede requerir una inyección de red virtual. Para obtener más información, consulte el siguiente artículo.
Adición de dependencias de Spark
Tiene que agregar la biblioteca de conectores de Cassandra de Apache Spark a su clúster para conectarse a los puntos de conexión nativos y de Cassandra de Azure Cosmos DB. En el clúster, seleccione Libraries>Install New>Maven (Bibliotecas > Instalar nueva > Maven) y, después, agregue com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
en las coordenadas de Maven.
Importante
Si tiene el requisito de conservar Apache Cassandra writetime
para cada fila durante la migración, le recomendamos que utilice esta muestra. El jar de dependencia de este ejemplo también contiene el conector Spark, por lo que debe instalarlo en lugar del ensamblado del conector anterior. Este ejemplo también es útil si desea realizar una validación de comparación de filas entre el origen y el destino después de completar la carga de datos histórica. Vea las secciones "ejecutar la carga de datos histórica" y "validar el origen y el destino" a continuación para obtener más detalles.
Seleccione Install (Instalar) y asegúrese de reiniciar el clúster cuando se complete la instalación.
Nota
Asegúrese de reiniciar el clúster de Azure Databricks después de instalar la biblioteca del conector de Cassandra.
Instalación del proxy de doble escritura
Para obtener un rendimiento óptimo durante las operaciones de doble escritura, se recomienda instalar el proxy en todos los nodos del clúster de Cassandra de origen.
#assuming you do not have git already installed
sudo apt-get install git
#assuming you do not have maven already installed
sudo apt install maven
#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git
#change directory
cd cassandra-proxy
#compile the proxy
mvn package
Inicio del proxy de doble escritura
Se recomienda instalar el proxy en todos los nodos del clúster de Cassandra de origen. Como mínimo, ejecute el siguiente comando para iniciar el proxy en cada nodo. Reemplace <target-server>
por una dirección IP o de servidor de uno de los nodos del clúster de destino. Reemplace <path to JKS file>
por la ruta de acceso a un archivo .jks local, y <keystore password>
, por la contraseña correspondiente.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>
Al iniciar el proxy de esta manera, se da por hecho que se cumple lo siguiente:
- Los puntos de conexión de origen y destino tienen el mismo nombre de usuario y la misma contraseña.
- Los puntos de conexión de origen y destino implementan el protocolo SSL (Capa de sockets seguros).
Si los puntos de conexión de origen y destino no pueden cumplir estos criterios, siga leyendo para conocer más opciones de configuración.
Configuración de SSL
Para el protocolo SSL, puede implementar un almacén de claves que ya utilice (por ejemplo, el que usa el clúster de origen) o crear un certificado autofirmado mediante keytool
:
keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048
También puede deshabilitar SSL para los puntos de conexión de origen o destino si no implementan este protocolo. Use las marcas --disable-source-tls
o --disable-target-tls
:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password> --disable-source-tls true --disable-target-tls true
Nota
Asegúrese de que la aplicación cliente usa el mismo almacén de claves y la misma contraseña que el proxy de doble escritura cuando cree conexiones SSL a la base de datos a través del proxy.
Configuración de las credenciales y el puerto
De forma predeterminada, se pasan las credenciales de origen de la aplicación cliente. El proxy usará las credenciales para establecer conexiones con los clústeres de origen y de destino. Como hemos mencionado anteriormente, en este proceso se da por supuesto que las credenciales del origen y el destino son las mismas. Será necesario especificar aparte un nombre de usuario y una contraseña diferentes para el punto de conexión de la API de Cassandra de destino al iniciar el proxy:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>
Si no se especifican, los puertos de origen y destino predeterminados serán el 9042. En este caso, la API de Cassandra se ejecuta en el puerto 10350
, por lo que debe --source-port
usar o --target-port
para especificar números de puerto:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>
Implementación del proxy de forma remota
Puede haber circunstancias en las que no quiera instalar el proxy en los propios nodos del clúster y prefiera instalarlo en una máquina aparte. En este caso, debe especificar la dirección IP de <source-server>
:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>
Advertencia
Instalar y ejecutar el proxy de forma remota en un equipo independiente (en lugar de ejecutarlo en todos los nodos del clúster de Apache Cassandra de origen) afectará al rendimiento mientras se produce la migración en directo. Aunque funcionará funcionalmente, el controlador de cliente no podrá abrir conexiones a todos los nodos del clúster y se basará en el único nodo de coordinador (donde está instalado el proxy) para realizar conexiones.
Cero cambios en el código de las aplicaciones
De forma predeterminada, el proxy escucha en el puerto 29042. El código de las aplicaciones debe cambiarse para que apunte a este puerto. Sin embargo, puede cambiar el puerto en el que el proxy escucha. Puede hacerlo si desea evitar cambios de código en las aplicaciones de las siguientes formas:
- Haciendo que el servidor de Cassandra de origen se ejecute en otro puerto.
- Haciendo que el proxy se ejecute en el puerto estándar 9042 de Cassandra.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042
Nota
La instalación del proxy en los nodos del clúster no requiere que se reinicien los nodos. Sin embargo, si tiene muchos clientes de aplicaciones y prefiere que el proxy se ejecute en el puerto estándar 9042 de Cassandra para evitar cambios de código en las aplicaciones, debe cambiar el puerto predeterminado de Apache Cassandra. Después, debe reiniciar los nodos del clúster y configurar el puerto de origen para que sea el nuevo puerto que definió para el clúster de Cassandra de origen.
En el ejemplo siguiente, cambiamos el clúster de Cassandra de origen para que se ejecute en el puerto 3074 e iniciamos el clúster en el puerto 9042:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042 --source-port 3074
Imposición de protocolos
El proxy tiene funcionalidad para imponer protocolos, lo que podría ser necesario si el punto de conexión de origen es más avanzado que el de destino o no se admite por otro motivo. En ese caso, puede especificar --protocol-version
y --cql-version
para exigir que el protocolo cumpla con el destino:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --protocol-version 4 --cql-version 3.11
Una vez que se ejecute el proxy de doble escritura, deberá cambiar el puerto en el cliente de la aplicación y reiniciarlo; o bien cambie el puerto de Cassandra y reinicie el clúster si ha elegido ese método. A continuación, el proxy iniciará el reenvío de operaciones de escritura al punto de conexión de destino. Puede obtener información sobre la supervisión y las métricas disponibles en la herramienta de proxy.
Ejecución de la carga de datos históricos
Para cargar los datos, cree un cuaderno de Scala en la cuenta de Azure Databricks. Reemplace las configuraciones de Cassandra de origen y de destino por las credenciales correspondientes, así como los espacios de claves y las tablas de origen y destino. Agregue más variables para cada tabla según sea necesario en el ejemplo siguiente y ejecútelo. Cuando la aplicación empiece a enviar solicitudes al proxy de doble escritura, estará listo para migrar datos históricos.
Importante
Antes de migrar los datos, aumente el rendimiento del contenedor hasta el valor necesario para que la aplicación se migre rápidamente. El escalado del rendimiento antes de iniciar la migración le ayudará a reducir el tiempo de migración. Para ayudar a proteger contra la limitación de tasas durante la carga de datos histórica, es posible que desee habilitar los reintentos de servidor (SSR) en la API de Cassandra. Consulte nuestro siguiente artículo para obtener más información e instrucciones sobre cómo habilitar SSR.
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val sourceCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val targetCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "10350",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000
//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(sourceCassandra)
.load
//Write to target Cassandra
DFfromSourceCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(targetCassandra)
.option("writetime", timestamp)
.mode(SaveMode.Append)
.save
Nota
En el ejemplo anterior de Scala, observará que timestamp
se establece en la hora actual antes de leer todos los datos de la tabla de origen. Después, writetime
se antedata con esta marca de tiempo. Esto garantiza que los registros que se escriben con la carga de datos históricos en el punto de conexión de destino no sobrescriban las actualizaciones que se incorporan con una marca de tiempo posterior desde el proxy de doble escritura mientras se leen los datos históricos.
Importante
Si, por algún motivo, necesita conservar las marcas de tiempo exactas, debe seguir un enfoque de migración de datos históricos que mantenga las marcas de tiempo, como en este ejemplo. El jar de dependencia del ejemplo también contiene el conector Spark, por lo que no es necesario instalar el ensamblado del conector spark mencionado en los requisitos previos anteriores: tener ambos instalados en el clúster de Spark provocará conflictos.
Validación del origen y el destino
Una vez completada la carga de datos históricos, las bases de datos deben estar sincronizadas y listas para la transición. Sin embargo, le recomendamos que valide el origen y el destino para asegurarse de que coincidan antes de cortar por fin.
Nota
Si usó la muestra del migrador de cassandra mencionada anteriormente para preservar writetime
, esto incluye la capacidad de validar la migracióncomparando filas en el origen y el destino según ciertas tolerancias.