Apache Spark kullanarak Apache Cassandra için Azure Yönetilen Örneği'ne geçiş

Mümkün olduğunda, karma küme yapılandırarak mevcut kümenizdeki verileri Apache Cassandra için Azure Yönetilen Örneği'ne geçirmek için Apache Cassandra yerel çoğaltmasını kullanmanızı öneririz. Bu yaklaşımda, kaynak veri merkezinizdeki verileri yeni yönetilen örnek veri merkezinize çoğaltmak için Apache Cassandra'nın dedikodu protokolü kullanılır. Ancak, kaynak veritabanı sürümünüzün uyumlu olmadığı veya karma küme kurulumunun mümkün olmadığı bazı senaryolar olabilir.

Bu öğreticide, Cassandra Spark Bağlayıcısı ve Apache Spark için Azure Databricks kullanılarak verilerin çevrimdışı olarak Apache Cassandra için Azure Yönetilen Örneği'ne nasıl geçirileceğini açıklanmaktadır.

Önkoşullar

Azure Databricks kümesi sağlama

Spark 3.0'ı destekleyen Databricks çalışma zamanı sürüm 7.5'i seçmenizi öneririz.

Databricks çalışma zamanı sürümünü bulmayı gösteren ekran görüntüsü.

Bağımlılık ekleme

Hem yerel hem de Azure Cosmos DB Cassandra uç noktalarına bağlanmak için kümenize Apache Spark Cassandra Bağlayıcı kitaplığını ekleyin. Kümenizde Kitaplıklar> YeniMavenYükle'yi> seçin ve ardından Maven koordinatlarını ekleyincom.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0.

Databricks'te Maven paketlerini aramayı gösteren ekran görüntüsü.

Yükle'yi seçin ve yükleme tamamlandığında kümeyi yeniden başlatın.

Not

Cassandra Bağlayıcı kitaplığı yüklendikten sonra Databricks kümesini yeniden başlattığınızdan emin olun.

Geçiş için Scala Not Defteri oluşturma

Databricks'te bir Scala Not Defteri oluşturun. Kaynak ve hedef Cassandra yapılandırmalarınızı ilgili kimlik bilgileriyle, kaynak ve hedef anahtar alanlarıyla tablolarla değiştirin. Ardından aşağıdaki kodu çalıştırın:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Not

Her satırın özgün writetime öğesini korumanız gerekiyorsa cassandra migrator örneğine bakın.

Sonraki adımlar