Миграция в Управляемый экземпляр Azure для Apache Cassandra с использованием Apache Spark

По возможности мы советуем использовать собственную репликацию Apache Cassandra для переноса данных из существующего кластера в Управляемый экземпляр Azure для Apache Cassandra, настроив гибридный кластер. При этом будет использоваться протокол Apache Cassandra для репликации данных из центра данных в новый управляемый экземпляр. Однако возможны ситуации, когда версия базы данных — источника несовместима или установка гибридного кластера нецелесообразна.

В этом учебнике описывается, как перенести данные в Управляемый экземпляр Azure для Apache Cassandra в автономном режиме с помощью соединителя Cassandra Spark и Azure Databricks для Apache Spark.

Предварительные требования

Предоставление кластера Azure Databricks

Мы рекомендуем выбрать среду выполнения Databricks версии 7.5, которая поддерживает Spark 3.0.

Снимок экрана, на котором показано, как найти версию среды выполнения Databricks.

Добавление зависимостей

Добавьте библиотеку соединителей Apache Spark Cassandra в кластер для подключения к собственным конечным точкам, а также к конечным точкам Cassandra в Azure Cosmos DB. В кластере выберите Библиотеки>Установить>Maven, а затем добавьте com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 в координаты Maven.

Снимок экрана, на котором показано поиск пакетов Maven в Databricks.

Нажмите кнопку Установить, а затем перезапустите кластер после завершения установки.

Примечание

Убедитесь, что кластер Databricks перезапускается после установки библиотеки соединителей Cassandra.

Создание записной книжки Scala для миграции

Создайте записную книжку Scala в Databricks. Замените исходные и целевые конфигурации Cassandra соответствующими учетными данными, а также исходными и целевыми пространствами ключей и таблицами. Затем выполните следующий код.

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Примечание

Если вам нужно сохранить оригинал writetime каждой строки, обратитесь к примеру миграции cassandra .

Дальнейшие действия