Partilhar via


Migrar dados ao vivo do Apache Cassandra para o Azure Cosmos DB para Apache Cassandra usando proxy de gravação dupla e Apache Spark

A API para Cassandra no Azure Cosmos DB tornou-se uma ótima opção para cargas de trabalho corporativas em execução no Apache Cassandra por vários motivos, como:

  • Sem sobrecarga de gerenciamento e monitoramento: Ele elimina a sobrecarga de gerenciar e monitorar uma infinidade de configurações em arquivos de sistema operacional, JVM e yaml e suas interações.

  • Economia de custos significativa: você pode economizar custos com o Azure Cosmos DB, que inclui o custo de VMs, largura de banda e quaisquer licenças aplicáveis. Além disso, você não precisa gerenciar os data centers, servidores, armazenamento SSD, rede e custos de eletricidade.

  • Capacidade de utilizar código e ferramentas existente: o Azure Cosmos DB oferece compatibilidade ao nível do protocolo de transmissão com SDKs e ferramentas do Cassandra existentes. Essa compatibilidade garante que você possa usar sua base de código existente com o Azure Cosmos DB para Apache Cassandra com alterações triviais.

O Azure Cosmos DB não suporta o protocolo de fofocas Apache Cassandra nativo para replicação. Portanto, onde zero tempo de inatividade é um requisito para a migração, uma abordagem diferente é necessária. Este tutorial descreve como migrar dados ao vivo para o Azure Cosmos DB para Apache Cassandra de um cluster Apache Cassandra nativo usando um proxy de gravação dupla e o Apache Spark.

A imagem a seguir ilustra o padrão. O proxy de gravação dupla é usado para capturar alterações em tempo real, enquanto os dados históricos são copiados em massa usando o Apache Spark. O proxy pode aceitar conexões do código do seu aplicativo com poucas ou nenhuma alteração de configuração. Ele encaminhará todas as solicitações para seu banco de dados de origem e roteará de forma assíncrona as gravações na API para Cassandra enquanto a cópia em massa estiver acontecendo.

Animação que mostra a migração ao vivo de dados para a Instância Gerenciada do Azure para Apache Cassandra.

Pré-requisitos

  • Provisione uma conta do Azure Cosmos DB para Apache Cassandra.

  • Analise as noções básicas de conexão a um Azure Cosmos DB para Apache Cassandra.

  • Analise os recursos com suporte no Azure Cosmos DB para Apache Cassandra para garantir a compatibilidade.

  • Use cqlsh para validação.

  • Certifique-se de ter conectividade de rede entre o cluster de origem e a API de destino para o ponto de extremidade Cassandra.

  • Certifique-se de que você já migrou o esquema keyspace/table do seu banco de dados Cassandra de origem para sua API de destino para a conta Cassandra.

    Importante

    Se você tiver um requisito para preservar o Apache Cassandra writetime durante a migração, os seguintes sinalizadores devem ser definidos ao criar tabelas:

    with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true
    

    Por exemplo:

    CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
    
    CREATE TABLE IF NOT EXISTS migrationkeyspace.users (
     name text,
     userID int,
     address text,
     phone int,
     PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;
    

Provisionar um cluster do Spark

Recomendamos o Azure Databricks. Use um tempo de execução que suporte o Spark 3.0 ou superior.

Importante

Você precisa garantir que sua conta do Azure Databricks tenha conectividade de rede com seu cluster Apache Cassandra de origem. Isto pode requerer injeção de VNet. Consulte o artigo aqui para mais informações.

Captura de tela que mostra como localizar a versão de tempo de execução do Azure Databricks.

Adicionar dependências do Spark

Você precisa adicionar a biblioteca Apache Spark Cassandra Connector ao cluster para se conectar aos pontos de extremidade Cassandra nativos e do Azure Cosmos DB. No cluster, selecione Bibliotecas>Instalar Novo>Maven e adicione com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 as coordenadas do Maven.

Importante

Se você tiver um requisito para preservar o Apache Cassandra writetime para cada linha durante a migração, recomendamos usar este exemplo. O jar de dependência neste exemplo também contém o conector Spark, portanto, você deve instalá-lo em vez do conjunto de conector acima. Este exemplo também é útil se você quiser executar uma validação de comparação de linha entre origem e destino após a conclusão do carregamento de dados históricos. Consulte as seções "executar a carga de dados históricos" e "validar a origem e o destino" abaixo para obter mais detalhes.

Captura de ecrã que mostra a procura de pacotes Maven no Azure Databricks.

Selecione Instalar e reinicie o cluster quando a instalação estiver concluída.

Nota

Certifique-se de reiniciar o cluster do Azure Databricks após a instalação da biblioteca Cassandra Connector.

Instalar o proxy de gravação dupla

Para um desempenho ideal durante gravações duplas, recomendamos instalar o proxy em todos os nós do cluster Cassandra de origem.

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

Inicie o proxy de gravação dupla

Recomendamos que você instale o proxy em todos os nós do cluster Cassandra de origem. No mínimo, execute o seguinte comando para iniciar o proxy em cada nó. Substitua <target-server> por um IP ou endereço de servidor de um dos nós no cluster de destino. Substitua <path to JKS file> pelo caminho para um arquivo .jks local e substitua <keystore password> pela senha correspondente.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

Iniciar o proxy dessa maneira pressupõe que o seguinte seja verdadeiro:

  • Os pontos de extremidade de origem e de destino têm o mesmo nome de usuário e senha.
  • Os endpoints de origem e destino implementam Secure Sockets Layer (SSL).

Se seus pontos de extremidade de origem e de destino não puderem atender a esses critérios, continue lendo para obter mais opções de configuração.

Configurar o SSL

Para SSL, você pode implementar um keystore existente (por exemplo, aquele que o cluster de origem usa) ou criar um certificado autoassinado usando keytool:

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

Você também pode desabilitar o SSL para pontos de extremidade de origem ou de destino se eles não implementarem SSL. Use as --disable-source-tls bandeiras ou --disable-target-tls :

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password> --disable-source-tls true  --disable-target-tls true 

Nota

Certifique-se de que seu aplicativo cliente usa o mesmo armazenamento de chaves e senha que os usados para o proxy de gravação dupla quando você estiver criando conexões SSL para o banco de dados por meio do proxy.

Configurar as credenciais e a porta

Por padrão, as credenciais de origem serão passadas do seu aplicativo cliente. O proxy usará as credenciais para fazer conexões com os clusters de origem e destino. Como mencionado anteriormente, esse processo pressupõe que as credenciais de origem e de destino são as mesmas. Será necessário especificar um nome de usuário e senha diferentes para a API de destino para o ponto de extremidade Cassandra separadamente ao iniciar o proxy:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

As portas de origem e destino padrão, quando não especificadas, serão 9042. Nesse caso, a API para Cassandra é executada na porta 10350, então você precisa usar --source-port ou --target-port especificar números de porta:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

Implantar o proxy remotamente

Pode haver circunstâncias em que você não queira instalar o proxy nos próprios nós do cluster e prefira instalá-lo em uma máquina separada. Nesse cenário, você precisa especificar o endereço IP de <source-server>:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

Aviso

Instalar e executar o proxy remotamente em uma máquina separada (em vez de executá-lo em todos os nós do cluster Apache Cassandra de origem) afetará o desempenho enquanto a migração ao vivo ocorre. Embora funcione funcionalmente, o driver do cliente não poderá abrir conexões com todos os nós dentro do cluster e dependerá do nó coordenador único (onde o proxy está instalado) para fazer conexões.

Permitir zero alterações no código do aplicativo

Por padrão, o proxy escuta na porta 29042. O código do aplicativo deve ser alterado para apontar para essa porta. No entanto, você pode alterar a porta que o proxy escuta. Você pode fazer isso se quiser eliminar as alterações de código no nível do aplicativo:

  • Ter o servidor Cassandra de origem executado em uma porta diferente.
  • Ter o proxy executado na porta Cassandra padrão 9042.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

Nota

A instalação do proxy em nós de cluster não requer a reinicialização dos nós. No entanto, se você tiver muitos clientes de aplicativos e preferir ter o proxy em execução na porta padrão Cassandra 9042 para eliminar quaisquer alterações de código no nível do aplicativo, você precisará alterar a porta padrão do Apache Cassandra. Em seguida, você precisa reiniciar os nós no cluster e configurar a porta de origem para ser a nova porta definida para o cluster Cassandra de origem.

No exemplo a seguir, alteramos o cluster Cassandra de origem para ser executado na porta 3074 e iniciamos o cluster na porta 9042:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042 --source-port 3074

Forçar protocolos

O proxy tem funcionalidade para forçar protocolos, o que pode ser necessário se o ponto de extremidade de origem for mais avançado do que o destino ou não for suportado. Nesse caso, você pode especificar --protocol-version e --cql-version forçar o protocolo a cumprir com o destino:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --protocol-version 4 --cql-version 3.11

Depois que o proxy de gravação dupla estiver em execução, você precisará alterar a porta no cliente do aplicativo e reiniciar. (Ou altere a porta Cassandra e reinicie o cluster se tiver escolhido essa abordagem.) O proxy começará a encaminhar gravações para o ponto de extremidade de destino. Você pode aprender sobre monitoramento e métricas disponíveis na ferramenta de proxy.

Executar a carga de dados históricos

Para carregar os dados, crie um bloco de anotações Scala em sua conta do Azure Databricks. Substitua as configurações Cassandra de origem e de destino pelas credenciais correspondentes e substitua os espaços-chave e tabelas de origem e destino. Adicione mais variáveis para cada tabela, conforme necessário, ao exemplo a seguir e execute. Depois que seu aplicativo começar a enviar solicitações para o proxy de gravação dupla, você estará pronto para migrar dados históricos.

Importante

Antes de migrar os dados, aumente a taxa de transferência do contêiner para a quantidade necessária para que seu aplicativo migre rapidamente. Dimensionar a taxa de transferência antes de iniciar a migração ajudará você a migrar seus dados em menos tempo. Para ajudar a proteger contra a limitação de taxa durante a carga de dados históricos, convém habilitar as repetições do lado do servidor (SSR) na API para Cassandra. Consulte o nosso artigo aqui para obter mais informações e instruções sobre como ativar o SSR.

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

Nota

No exemplo de Scala anterior, você notará que timestamp está sendo definido para a hora atual antes de ler todos os dados na tabela de origem. Então, writetime está sendo definido para este carimbo de data/hora retroativo. Isso garante que os registros gravados da carga de dados históricos para o ponto de extremidade de destino não possam substituir atualizações que chegam com um carimbo de data/hora posterior do proxy de gravação dupla enquanto os dados históricos estão sendo lidos.

Importante

Se você precisar preservar carimbos de data/hora exatos por qualquer motivo, deverá adotar uma abordagem de migração de dados históricos que preserve carimbos de data/hora, como este exemplo. O jar de dependência no exemplo também contém o conector Spark, portanto, você não precisa instalar o conjunto do conector Spark mencionado nos pré-requisitos anteriores - ter ambos instalados no cluster Spark causará conflitos.

Validar a origem e o destino

Após a conclusão do carregamento de dados históricos, seus bancos de dados devem estar sincronizados e prontos para substituição. No entanto, recomendamos que você valide a origem e o destino para garantir que eles correspondam antes de finalmente cortar.

Nota

Se você usou o exemplo de migrador cassandra mencionado acima para preservar, writetimeisso inclui a capacidade de validar a migração comparando linhas na origem e no destino com base em determinadas tolerâncias.

Próximos passos