Migrace do služby Azure Managed Instance for Apache Cassandra s využitím Apache Sparku

Pokud je to možné, doporučujeme migrovat data z existujícího clusteru do služby Azure Managed Instance for Apache Cassandra konfigurací hybridního clusteru pomocí nativní replikace Apache Cassandra. Tento přístup použije k replikaci dat ze zdrojového datacentra do nového datacentra spravované instance protokol drbů Apache Cassandry. V některých scénářích však může být, že verze zdrojové databáze není kompatibilní nebo nastavení hybridního clusteru není jinak možné.

Tento kurz popisuje, jak offline migrovat data do služby Azure Managed Instance for Apache Cassandra pomocí konektoru Cassandra Spark a Azure Databricks pro Apache Spark.

Požadavky

Zřízení clusteru Azure Databricks

Doporučujeme vybrat modul runtime Databricks verze 7.5, který podporuje Spark 3.0.

Snímek obrazovky znázorňující vyhledání verze modulu runtime Databricks

Přidání závislostí

Přidejte do clusteru knihovnu konektoru Apache Spark Cassandra, abyste se mohli připojit k nativním koncovým bodům i koncovým bodům Cassandra služby Azure Cosmos DB. V clusteru vyberte Knihovny>Nainstalovat nový>Maven a pak přidejte com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 souřadnice Mavenu.

Snímek obrazovky znázorňující hledání balíčků Maven v Databricks

Vyberte Nainstalovat a po dokončení instalace restartujte cluster.

Poznámka

Po instalaci knihovny konektoru Cassandra nezapomeňte cluster Databricks restartovat.

Vytvoření poznámkového bloku Scala pro migraci

Vytvoření poznámkového bloku Scala v Databricks Nahraďte zdrojové a cílové konfigurace Cassandra odpovídajícími přihlašovacími údaji a zdrojovými a cílovými prostory klíčů a tabulkami. Pak spusťte následující kód:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Poznámka

Pokud potřebujete zachovat originál writetime každého řádku, projděte si ukázku migrace cassandra .

Další kroky