Livemigrera data från Apache Cassandra till Azure Cosmos DB för Apache Cassandra med hjälp av dubbelskrivningsproxy och Apache Spark

API för Cassandra i Azure Cosmos DB är ett bra val för företagsarbetsbelastningar som körs på Apache Cassandra av olika skäl:

  • Inga kostnader för hantering och övervakning: Det eliminerar kostnaderna för att hantera och övervaka en mängd olika inställningar i operativsystem, virtuella Java-datorer och yaml-filer och deras interaktioner.
  • Betydande kostnadsbesparingar: Du kan spara kostnader med Azure Cosmos DB, vilket inkluderar kostnaden för virtuella datorer, bandbredd och eventuella tillämpliga licenser. Du behöver inte hantera datacenter, servrar, SSD-lagring, nätverk och elkostnader.
  • Möjlighet att använda befintlig kod och verktyg: Azure Cosmos DB erbjuder kompatibilitet på trådprotokollsnivå med befintliga SDK:er och verktyg för Cassandra. Den här kompatibiliteten säkerställer att du kan använda din befintliga kodbas med Azure Cosmos DB för Apache Cassandra med triviala ändringar.

Azure Cosmos DB stöder inte det interna Apache Cassandra-skvallerprotokollet för replikering. Om noll stilleståndstid är ett krav för migrering krävs en annan metod. I den här självstudien beskrivs hur du direktmigrerar data till Azure Cosmos DB för Apache Cassandra från ett inbyggt Apache Cassandra-kluster med hjälp av en dubbelskrivningsproxy och Apache Spark.

Följande bild illustrerar mönstret. Proxyn med dubbla skrivningar används för att samla in liveändringar. Historiska data kopieras massvis med Apache Spark. Proxyn kan acceptera anslutningar från programkoden med få eller inga konfigurationsändringar. Den dirigerar alla begäranden till källdatabasen och dirigerar asynkront skrivningar till API för Cassandra medan masskopiering sker.

Animering som visar direktmigrering av data till Azure Managed Instance för Apache Cassandra.

Förutsättningar

Viktigt!

Om du har ett krav på att bevara Apache Cassandra writetime under migreringen måste följande flaggor anges när du skapar tabeller:

with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true

Till exempel:

CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users (
 name text,
 userID int,
 address text,
 phone int,
 PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;

Etablera ett Spark-kluster

Vi rekommenderar att du använder Azure Databricks. Använd en körning som stöder Spark 3.0 eller senare.

Viktigt!

Du måste se till att ditt Azure Databricks-konto har nätverksanslutning med ditt Apache Cassandra-källkluster. Den här konfigurationen kan kräva inmatning av virtuella nätverk. Mer information finns i Distribuera Azure Databricks i ditt virtuella Azure-nätverk.

Skärmbild som visar hur du hittar Azure Databricks-körningsversionen.

Lägga till Spark-beroenden

Lägg till Apache Spark Cassandra Connector-biblioteket i klustret för att ansluta till både interna slutpunkter och Azure Cosmos DB Cassandra-slutpunkter. I klustret väljer du > och lägger sedan till i Maven-koordinater.

Viktigt!

Om du har ett krav på att bevara Apache Cassandra writetime för varje rad under migreringen rekommenderar vi att du använder det här exemplet. Beroende-JAR:en i det här exemplet innehåller även Spark-anslutningsappen, så du bör installera den här versionen i stället för anslutningssammansättningen som beskrevs tidigare.

Det här exemplet är också användbart om du vill utföra en radjämförelseverifiering mellan källa och mål när den historiska datainläsningen har slutförts. Mer information finns i Kör den historiska datainläsningen och Verifiera källan och målet.

Skärmbild som visar sökning efter Maven-paket i Azure Databricks.

Välj Installera och starta sedan om klustret när installationen är klar.

Kommentar

Se till att starta om Azure Databricks-klustret när Cassandra Connector-biblioteket har installerats.

Installera proxyn med dubbel skrivning

För optimala prestanda vid dubbla skrivningar rekommenderar vi att du installerar proxyn på alla noder i ditt Cassandra-källkluster.

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

Starta proxyn med dubbel skrivning

Vi rekommenderar att du installerar proxyn på alla noder i ditt Cassandra-källkluster. Kör minst följande kommando för att starta proxyn på varje nod. Ersätt <target-server> med en IP- eller serveradress från en av noderna i målklustret. Ersätt <path to JKS file> med sökvägen till en lokal .jks-fil och ersätt <keystore password> med motsvarande lösenord.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

Att starta proxyn på det här sättet förutsätter att följande är sant:

  • Käll- och målslutpunkter har samma användarnamn och lösenord.
  • Käll- och målslutpunkter implementerar SSL (Secure Sockets Layer).

Om dina käll- och målslutpunkter inte kan uppfylla dessa kriterier kan du läsa vidare för ytterligare konfigurationsalternativ.

Konfigurera SSL

För SSL kan du antingen implementera ett befintligt nyckelarkiv, till exempel det som källklustret använder, eller skapa ett självsignerat certifikat med hjälp keytoolav :

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

Du kan också inaktivera SSL för käll- eller målslutpunkter om de inte implementerar SSL. Använd flaggorna --disable-source-tls eller --disable-target-tls :

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> \
  --target-password <password> --disable-source-tls true  --disable-target-tls true 

Kommentar

Kontrollera att klientprogrammet använder samma nyckelarkiv och lösenord som de som används för proxyn med dubbel skrivning när du skapar SSL-anslutningar till databasen via proxyn.

Konfigurera autentiseringsuppgifterna och porten

Som standard skickar klientappen källautentiseringsuppgifterna. Proxyn använder autentiseringsuppgifterna för att upprätta anslutningar till käll- och målkluster. Som tidigare nämnts förutsätter den här processen att käll- och målautentiseringsuppgifterna är desamma. Du måste ange ett annat användarnamn och lösenord för mål-API:et för Cassandra-slutpunkten separat när du startar proxyn:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> \
  --target-username <username> --target-password <password>

Standardportarna för källa och mål, när de inte anges, är 9042. I det här fallet körs API för Cassandra på port 10350. Använd --source-port eller --target-port för att ange portnummer:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

Fjärrdistribuering av proxyn

Det kan finnas omständigheter då du inte vill installera proxyn på själva klusternoderna. Du kanske föredrar att installera den på en separat dator. I det scenariot anger du IP-adressen <source-server>för :

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

Varning

Att installera och köra proxyn via fjärranslutning på en separat dator i stället för att köra den på alla noder i ditt Apache Cassandra-källkluster påverkar prestandan när direktmigreringen sker. Även om den här konfigurationen fungerar funktionellt kan klientdrivrutinen inte öppna anslutningar till alla noder i klustret. Klienten förlitar sig på den enda koordinatornoden där proxyn är installerad för att upprätta anslutningar.

Tillåt noll programkodändringar

Som standard lyssnar proxyn på port 29042. Ändra programkoden så att den pekar på den här porten. Du kan i stället ändra den port som proxyn lyssnar på. Du kan göra den här ändringen om du vill eliminera kodändringar på programnivå genom att:

  • Låta Cassandra-källservern köras på en annan port.
  • Att proxyn körs på Cassandra-standardporten 9042.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

Kommentar

Installation av proxyn på klusternoder kräver inte omstart av noderna. Om du har många programklienter och föredrar att köra proxyn på Cassandra-standardporten 9042 för att eliminera kodändringar på programnivå ändrar du standardporten för Apache Cassandra. Sedan måste du starta om noderna i klustret och konfigurera källporten till den nya port som du har definierat för ditt Cassandra-källkluster.

I följande exempel ändrar vi Cassandra-källklustret så att det körs på port 3074 och vi startar klustret på port 9042:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
 --proxy-port 9042 --source-port 3074

Framtvinga protokoll

Proxyn har funktioner för att framtvinga protokoll, vilket kan vara nödvändigt om källslutpunkten är mer avancerad än målet eller på annat sätt inte stöds. I så fall kan du ange --protocol-version och --cql-version tvinga protokollet att följa målet:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
  --protocol-version 4 --cql-version 3.11

När proxyn med dubbel skrivning har körts måste du ändra porten på programklienten och starta om. Eller ändra Cassandra-porten och starta om klustret om du väljer den metoden. Proxyn börjar vidarebefordra skrivningar till målslutpunkten. Mer information finns i Övervakning och mått.

Kör den historiska datainläsningen

Om du vill läsa in data skapar du en Scala-notebook-fil i ditt Azure Databricks-konto. Ersätt dina cassandra-käll- och målkonfigurationer med motsvarande autentiseringsuppgifter och ersätt käll- och målnyckelrymderna och tabellerna. Lägg till fler variabler för varje tabell efter behov i följande exempel och kör sedan. När programmet börjar skicka begäranden till proxyn med dubbla skrivningar är du redo att migrera historiska data.

Viktigt!

Innan du migrerar data ökar du containerns dataflöde till det belopp som krävs för att programmet ska kunna migrera snabbt. Genom att skala dataflödet innan du påbörjar migreringen kan du migrera dina data på kortare tid. För att skydda mot hastighetsbegränsning under den historiska datainläsningen kan du aktivera återförsök på serversidan (SSR) i API för Cassandra. Anvisningar om hur du aktiverar SSR och mer information finns i Förhindra hastighetsbegränsningsfel för Azure Cosmos DB för Apache Cassandra-åtgärder.

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

Kommentar

I föregående Scala-exempel ser du att timestamp det är inställt på den aktuella tiden innan du läser alla data i källtabellen. writetime Sedan anges till den här bakåtdaterade tidsstämpeln. Den här metoden säkerställer att poster som skrivs från den historiska datainläsningen till målslutpunkten inte kan skriva över uppdateringar som kommer in med en senare tidsstämpel från proxyn med dubbla skrivningar medan historiska data läses.

Viktigt!

Om du av någon anledning behöver bevara exakta tidsstämplar bör du använda en metod för historisk datamigrering som bevarar tidsstämplar, till exempel det här exemplet. Beroende-JAR:en i exemplet innehåller också Spark-anslutningsappen, så du behöver inte installera spark-anslutningssammansättningen som nämns i de tidigare förhandskraven. Att ha båda installerade i Spark-klustret orsakar konflikter.

Verifiera källan och målet

När den historiska datainläsningen är klar bör databaserna vara synkroniserade och klara för snabb användning. Vi rekommenderar att du verifierar källan och målet för att säkerställa att de matchar innan du slutligen skär över.

Kommentar

Om du använde Cassandra-migratorexemplet som nämndes tidigare för att writetimebevara innehåller det här exemplet möjligheten att verifiera migreringen genom att jämföra rader i källa och mål baserat på vissa toleranser.

Nästa steg