Delen via


Live gegevens migreren van Apache Cassandra naar Azure Cosmos DB voor Apache Cassandra met behulp van een proxy voor dubbele schrijfbewerkingen en Apache Spark

API voor Cassandra in Azure Cosmos DB is een uitstekende keuze geworden voor bedrijfsworkloads die op Apache Cassandra worden uitgevoerd om verschillende redenen, zoals:

  • Geen overhead voor het beheren en bewaken: het elimineert de overhead van het beheren en bewaken van talloze instellingen in os-, JVM- en yaml-bestanden en hun interacties.

  • Aanzienlijke kostenbesparingen: u kunt kosten besparen met Azure Cosmos DB, waaronder de kosten van VM's, bandbreedte en eventuele toepasselijke licenties. Daarnaast hoeft u de datacenters, servers, SSD-opslag, netwerken en elektriciteitskosten niet te beheren.

  • Mogelijkheid om bestaande code en hulpprogramma’s te gebruiken: Azure Cosmos DB biedt compatibiliteit van wire-protocolniveau’s met bestaande Cassandra-SDK’s en hulpprogramma’s. Deze compatibiliteit zorgt ervoor dat u uw bestaande codebase kunt gebruiken met Azure Cosmos DB voor Apache Cassandra met triviale wijzigingen.

Azure Cosmos DB biedt geen ondersteuning voor het systeemeigen Apache Cassandra-gossip-protocol voor replicatie. Daarom is een andere benadering nodig wanneer er geen downtime nodig is voor migratie. In deze zelfstudie wordt beschreven hoe u gegevens live migreert naar Azure Cosmos DB voor Apache Cassandra vanuit een systeemeigen Apache Cassandra-cluster met behulp van een proxy voor dubbele schrijfbewerkingen en Apache Spark.

In de volgende afbeelding ziet u het patroon. De dual-write-proxy wordt gebruikt om live wijzigingen vast te leggen, terwijl historische gegevens bulksgewijs worden gekopieerd met Apache Spark. De proxy kan verbindingen van uw toepassingscode accepteren met enkele of geen configuratiewijzigingen. Hiermee worden alle aanvragen naar uw brondatabase gerouteerd en worden schrijfbewerkingen asynchroon gerouteerd naar de API voor Cassandra terwijl bulksgewijs kopiëren plaatsvindt.

Animatie van de livemigratie van gegevens naar Azure Managed Instance voor Apache Cassandra.

Vereisten

  • Richt een Azure Cosmos DB in voor een Apache Cassandra-account.

  • Bekijk de basisbeginselen van het maken van verbinding met een Azure Cosmos DB voor Apache Cassandra.

  • Bekijk de ondersteunde functies in Azure Cosmos DB voor Apache Cassandra om compatibiliteit te garanderen.

  • Gebruik cqlsh voor validatie.

  • Zorg ervoor dat u netwerkconnectiviteit hebt tussen uw broncluster en doel-API voor Cassandra-eindpunt.

  • Zorg ervoor dat u het keyspace-/tabelschema al hebt gemigreerd van uw Bron Cassandra-database naar uw doel-API voor Cassandra-account.

    Belangrijk

    Als u tijdens de migratie een vereiste hebt om Apache Cassandra writetime te behouden, moeten de volgende vlaggen worden ingesteld bij het maken van tabellen:

    with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true
    

    Voorbeeld:

    CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
    
    CREATE TABLE IF NOT EXISTS migrationkeyspace.users (
     name text,
     userID int,
     address text,
     phone int,
     PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;
    

Een Spark-cluster inrichten

We raden Azure Databricks aan. Gebruik een runtime die Ondersteuning biedt voor Spark 3.0 of hoger.

Belangrijk

U moet ervoor zorgen dat uw Azure Databricks-account netwerkconnectiviteit heeft met uw Apache Cassandra-broncluster. Hiervoor is mogelijk VNet-injectie vereist. Zie het artikel hier voor meer informatie.

Schermopname van het vinden van de Runtime-versie van Azure Databricks.

Spark-afhankelijkheden toevoegen

U moet de Apache Spark Cassandra Connector-bibliotheek toevoegen aan uw cluster om verbinding te maken met zowel systeemeigen als Azure Cosmos DB Cassandra-eindpunten. Selecteer In uw cluster Bibliotheken>installeren nieuwe>Maven en voeg vervolgens Maven-coördinaten toe.com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0

Belangrijk

Als u een vereiste hebt om Apache Cassandra writetime voor elke rij tijdens de migratie te behouden, raden we u aan dit voorbeeld te gebruiken. De afhankelijkheids-JAR in dit voorbeeld bevat ook de Spark-connector. Installeer deze dus in plaats van de bovenstaande connectorassembly. Dit voorbeeld is ook handig als u een rijvergelijkingsvalidatie tussen bron en doel wilt uitvoeren nadat het laden van historische gegevens is voltooid. Zie de secties 'De belasting van historische gegevens uitvoeren' en 'valideer de bron en het doel' hieronder voor meer informatie.

Schermopname van het zoeken naar Maven-pakketten in Azure Databricks.

Selecteer Installeren en start het cluster opnieuw wanneer de installatie is voltooid.

Notitie

Start het Azure Databricks-cluster opnieuw nadat de Cassandra Connector-bibliotheek is geïnstalleerd.

De dual-write-proxy installeren

Voor optimale prestaties tijdens dubbele schrijfbewerkingen raden we u aan de proxy te installeren op alle knooppunten in uw cassandra-broncluster.

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

De dual-write-proxy starten

U wordt aangeraden de proxy te installeren op alle knooppunten in uw Cassandra-broncluster. Voer minimaal de volgende opdracht uit om de proxy op elk knooppunt te starten. Vervang door <target-server> een IP- of serveradres van een van de knooppunten in het doelcluster. Vervang <path to JKS file> door het pad naar een lokaal JKS-bestand en vervang dit door <keystore password> het bijbehorende wachtwoord.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

Als u de proxy op deze manier start, wordt ervan uitgegaan dat het volgende waar is:

  • Bron- en doeleindpunten hebben dezelfde gebruikersnaam en hetzelfde wachtwoord.
  • Bron- en doeleindpunten implementeren Secure Sockets Layer (SSL).

Als uw bron- en doeleindpunten niet aan deze criteria kunnen voldoen, leest u verder voor verdere configuratieopties.

SSL configureren

Voor SSL kunt u een bestaand sleutelarchief implementeren (bijvoorbeeld het sleutelarchief dat door uw broncluster wordt gebruikt) of een zelfondertekend certificaat maken met behulp van keytool:

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

U kunt SSL ook uitschakelen voor bron- of doeleindpunten als ze geen SSL implementeren. Gebruik de --disable-source-tls of --disable-target-tls vlaggen:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password> --disable-source-tls true  --disable-target-tls true 

Notitie

Zorg ervoor dat uw clienttoepassing dezelfde sleutelopslag en hetzelfde wachtwoord gebruikt als het wachtwoord dat wordt gebruikt voor de proxy voor dubbele schrijfbewerkingen wanneer u SSL-verbindingen met de database bouwt via de proxy.

De referenties en poort configureren

De bronreferenties worden standaard doorgegeven vanuit uw client-app. De proxy gebruikt de referenties voor het maken van verbindingen met de bron- en doelclusters. Zoals eerder vermeld, gaat dit proces ervan uit dat de bron- en doelreferenties hetzelfde zijn. U moet een andere gebruikersnaam en een ander wachtwoord opgeven voor de doel-API voor het Cassandra-eindpunt wanneer u de proxy start:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

De standaardbron- en doelpoorten, indien niet opgegeven, zijn 9042. In dit geval wordt de API voor Cassandra uitgevoerd op poort 10350, dus u moet poortnummers gebruiken --source-port of --target-port opgeven:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

De proxy extern implementeren

Mogelijk wilt u de proxy niet installeren op de clusterknooppunten zelf en wilt u deze liever installeren op een afzonderlijke computer. In dat scenario moet u het IP-adres van <source-server>:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

Waarschuwing

Het extern installeren en uitvoeren van de proxy op een afzonderlijke computer (in plaats van deze uit te voeren op alle knooppunten in uw Apache Cassandra-broncluster), heeft invloed op de prestaties terwijl de livemigratie plaatsvindt. Hoewel het functioneel werkt, kan het clientstuurprogramma geen verbindingen openen met alle knooppunten binnen het cluster en is het afhankelijk van het knooppunt met één co-ordinator (waar de proxy is geïnstalleerd) om verbindingen te maken.

Wijzigingen in toepassingscode nul toestaan

Standaard luistert de proxy op poort 29042. De toepassingscode moet worden gewijzigd om naar deze poort te verwijzen. U kunt echter de poort wijzigen waarop de proxy luistert. U kunt dit doen als u codewijzigingen op toepassingsniveau wilt elimineren door:

  • Als de Cassandra-bronserver op een andere poort wordt uitgevoerd.
  • De proxy wordt uitgevoerd op de standaard Cassandra-poort 9042.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

Notitie

Het installeren van de proxy op clusterknooppunten vereist geen herstart van de knooppunten. Als u echter veel toepassingsclients hebt en de proxy liever op de standaard Cassandra-poort 9042 wilt uitvoeren om codewijzigingen op toepassingsniveau te elimineren, moet u de standaardpoort van Apache Cassandra wijzigen. Vervolgens moet u de knooppunten in uw cluster opnieuw opstarten en de bronpoort configureren als de nieuwe poort die u hebt gedefinieerd voor uw bron-Cassandra-cluster.

In het volgende voorbeeld wijzigen we het Cassandra-broncluster zodat het wordt uitgevoerd op poort 3074 en starten we het cluster op poort 9042:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042 --source-port 3074

Protocollen afdwingen

De proxy heeft functionaliteit om protocollen af te dwingen. Dit kan nodig zijn als het broneindpunt geavanceerder is dan het doel of anderszins niet wordt ondersteund. In dat geval kunt u het protocol opgeven --protocol-version en --cql-version afdwingen dat het voldoet aan het doel:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --protocol-version 4 --cql-version 3.11

Nadat de dual-write-proxy wordt uitgevoerd, moet u de poort op uw toepassingsclient wijzigen en opnieuw opstarten. (Of wijzig de Cassandra-poort en start het cluster opnieuw op als u deze benadering hebt gekozen.) De proxy start vervolgens met het doorsturen van schrijfbewerkingen naar het doeleindpunt. Meer informatie over bewaking en metrische gegevens die beschikbaar zijn in het proxyhulpprogramma.

Het laden van historische gegevens uitvoeren

Als u de gegevens wilt laden, maakt u een Scala-notebook in uw Azure Databricks-account. Vervang uw bron- en doelconfiguraties van Cassandra door de bijbehorende referenties en vervang de bron- en doelsleutelruimten en -tabellen. Voeg indien nodig meer variabelen toe voor elke tabel aan het volgende voorbeeld en voer deze uit. Nadat uw toepassing aanvragen naar de proxy voor dubbele schrijfbewerkingen verzendt, bent u klaar om historische gegevens te migreren.

Belangrijk

Voordat u de gegevens migreert, verhoogt u de doorvoer van de container naar de hoeveelheid die uw toepassing nodig heeft om snel te migreren. Door de doorvoer te schalen voordat de migratie wordt gestart, kunt u uw gegevens in minder tijd migreren. Als u wilt beschermen tegen snelheidsbeperking tijdens de belasting van historische gegevens, wilt u mogelijk nieuwe pogingen aan de serverzijde (SSR) inschakelen in API voor Cassandra. Zie ons artikel hier voor meer informatie en instructies over het inschakelen van SSR.

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

Notitie

In het voorgaande Scala-voorbeeld ziet u dat timestamp deze is ingesteld op de huidige tijd voordat u alle gegevens in de brontabel leest. writetime Vervolgens wordt deze backdated tijdstempel ingesteld. Dit zorgt ervoor dat records die zijn geschreven van de historische gegevensbelasting naar het doeleindpunt, geen updates kunnen overschrijven die worden geleverd met een latere tijdstempel van de proxy voor dubbele schrijfbewerkingen terwijl historische gegevens worden gelezen.

Belangrijk

Als u om welke reden dan ook exacte tijdstempels wilt behouden, moet u een historische gegevensmigratiebenadering gebruiken die tijdstempels behoudt, zoals dit voorbeeld. De afhankelijkheids-JAR in het voorbeeld bevat ook de Spark-connector, dus u hoeft de Assembly van de Spark-connector niet te installeren die in de eerdere vereisten is vermeld. Als beide zijn geïnstalleerd in uw Spark-cluster, kunnen er conflicten optreden.

De bron en het doel valideren

Nadat het laden van historische gegevens is voltooid, moeten uw databases gesynchroniseerd zijn en klaar zijn voor cutover. We raden u echter aan om de bron en het doel te valideren om ervoor te zorgen dat deze overeenkomen voordat u de gegevens definitief oversnijd.

Notitie

Als u het hierboven genoemde cassandra-migratievoorbeeld hebt gebruikt voor behoudwritetime, omvat dit de mogelijkheid om de migratie te valideren door rijen in de bron en het doel te vergelijken op basis van bepaalde toleranties.

Volgende stappen