Freigeben über


Migrieren zu Azure Managed Instance for Apache Cassandra mit Apache Spark

Es wird empfohlen, nach Möglichkeit die native Apache Cassandra-Replikation zu verwenden, um Daten aus Ihrem vorhandenen Cluster zu Azure Managed Instance for Apache Cassandra zu migrieren, indem Sie einen Hybridcluster konfigurieren. Bei diesem Ansatz wird das Gossip-Protokoll von Apache Cassandra verwendet, um Daten aus Ihrem Quellrechenzentrum in Ihr neues Rechenzentrum der verwalteten Instanz zu replizieren. Es kann jedoch einige Szenarien geben, in denen Ihre Quelldatenbankversion nicht kompatibel ist oder ein Hybridclustersetup andernfalls nicht möglich ist.

In diesem Tutorial wird beschrieben, wie Sie Daten mithilfe des Cassandra Spark-Connectors und Azure Databricks für Apache Spark offline zu Azure Managed Instance for Apache Cassandra migrieren.

Voraussetzungen

Bereitstellen eines Azure Databricks-Clusters

Wir empfehlen die Auswahl der Version 7.5 der Databricks-Runtime, die Spark 3.0 unterstützt.

Screenshot, der die Suche nach der Databricks-Laufzeitversion zeigt.

Hinzufügen von Abhängigkeiten

Fügen Sie dem Cluster die Apache Spark-Cassandra-Connectorbibliothek hinzu, um eine Verbindung mit nativen und Azure Cosmos DB-Cassandra-Endpunkten herzustellen. Wählen Sie in Ihrem Cluster Bibliotheken>Neue>Maveninstallieren und fügen Sie dann com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 in Maven-Koordinaten hinzu.

Screenshot, der zeigt, wie Maven-Pakete in Databricks gesucht werden.

Wählen Sie Installieren aus, und starten Sie den Cluster nach Abschluss der Installation neu.

Hinweis

Stellen Sie sicher, dass Sie den Databricks-Cluster neu starten, nachdem die Cassandra-Connectorbibliothek installiert wurde.

Erstellen eines Scala-Notebooks für die Migration

Erstellen eines Scala Notebooks in Databricks. Ersetzen Sie Ihre Quell- und Zielkonfigurationen für Cassandra durch die entsprechenden Anmeldeinformationen und die Quell-und Zielkeyspaces und -tabellen. Führen Sie dann den folgenden Code aus:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Hinweis

Wenn Sie die ursprüngliche writetime jeder Zeile beibehalten möchten, sehen Sie sich das Beispiel zur Cassandra-Migration an.

Nächste Schritte