デュアル書き込みプロキシを使用した Azure Managed Instance for Apache Cassandra へのライブ マイグレーション

可能な限り、Apache Cassandra のネイティブ機能を使用し、ハイブリッド クラスターを構成することで、既存のクラスターから Azure Managed Instance for Apache Cassandra にデータを移行することをお勧めします。 この機能では、Apache Cassandra のゴシップ プロトコルが使用され、ソース データセンターから新しいマネージド インスタンス データセンターにデータがシームレスにレプリケートされます。 ただし、ソース データベースのバージョンに互換性がない場合や、ハイブリッド クラスターのセットアップが実行できない場合があります。

このチュートリアルでは、デュアル書き込みプロキシと Apache Spark を使用して、Azure Managed Instance for Apache Cassandra にデータのライブ マイグレーションを行う方法について説明します。 ライブ変更はデュアル書き込みプロキシを使用してキャプチャされ、履歴データは Apache Spark を使用して一括コピーされます。 このアプローチには次のような利点があります。

  • アプリケーションの変更が最小限。 構成をほとんど、またはまったく変更することなく、プロキシでアプリケーション コードからの接続を受け入れることができます。 すべての要求がソース データベースにルーティングされ、書き込みがセカンダリ ターゲットに非同期にルーティングされます。
  • クライアントでのワイヤー プロトコルの依存関係。 このアプローチはバックエンド リソースや内部プロトコルに依存しないため、Apache Cassandra ワイヤー プロトコルが実装されている任意のソースまたはターゲット Cassandra システムで使用できます。

次の図はこのアプローチを示したものです。

Azure Managed Instance for Apache Cassandra へのデータのライブ マイグレーションを示すアニメーション。

前提条件

Spark クラスターをプロビジョニングする

Spark 3.0 をサポートする Azure Databricks ランタイム バージョン 7.5 を選択することをお勧めします。

Azure Databricks ランタイムのバージョンを見つける方法を示すスクリーンショット。

Spark の依存関係を追加する

Apache Spark Cassandra コネクタ ライブラリをクラスターに追加して、ワイヤ プロトコルに互換性のある任意の Apache Cassandra エンドポイントに接続できるようにする必要があります。 自分のクラスターで、 [ライブラリ]>[新規インストール]>[Maven] の順に選択し、Maven 座標に com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 を追加します。

重要

移行中に行ごとに Apache Cassandra writetime を保持する必要がある場合は、こちらのサンプルを使用することをお勧めします。 このサンプルの依存関係 jar には Spark コネクタも含まれているので、上記のコネクタ アセンブリの代わりにこれをインストールする必要があります。 このサンプルは、履歴データの読み込み後にソースとターゲット間の行比較検証を実行する場合にも便利です。 詳細については、以下の「履歴データの読み込みを実行する」と「ソースとターゲットを検証する」を参照してください。

Azure Databricks で Maven パッケージを検索する方法を示すスクリーンショット。

[インストール] を選択し、インストールが完了したらクラスターを再起動します。

Note

Cassandra コネクタ ライブラリがインストールされた後で、必ず Azure Databricks クラスターを再起動してください。

デュアル書き込みプロキシをインストールする

デュアル書き込み時のパフォーマンスを最適化するために、ソース Cassandra クラスター内のすべてのノードにプロキシをインストールすることをお勧めします。

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

デュアル書き込みプロキシを開始する

ソース Cassandra クラスターのすべてのノードにプロキシをインストールすることをお勧めします。 少なくとも、各ノード上で次のコマンドを実行してプロキシを開始します。 <target-server> をターゲット クラスターのいずれかのノードの IP またはサーバー アドレスに置き換えます。 <path to JKS file> をローカル環境の .jks ファイルへのパスに置き換え、<keystore password> を対応するパスワードに置き換えます。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

この方法でプロキシを開始する場合は、次の条件を満たすことが前提となります。

  • ソースとターゲットのエンドポイントで、ユーザー名とパスワードが同じである。
  • ソースとターゲットのエンドポイントに、Secure Sockets Layer (SSL) が実装されている。

ソースとターゲットのエンドポイントでこれらの条件が満たされない場合は、詳細な構成オプションを参照してください。

SSL の構成

SSL については、既存のキーストア (ソース クラスターで使用されているものなど) を実装するか、keytool を使用して自己署名証明書を作成することができます。

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

また、ソースまたはターゲットのエンドポイントに SSL が実装されていない場合は、SSL を無効にすることができます。 --disable-source-tls または --disable-target-tls のフラグを使用します。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password> --disable-source-tls true  --disable-target-tls true 

注意

データベースへのプロキシ経由の SSL 接続を構築する場合は、必ずデュアル書き込みプロキシに使用したものと同じキーストアとパスワードをクライアント アプリケーションに使用してください。

資格情報とポートを構成する

既定では、ソースの資格情報がクライアント アプリから渡されます。 プロキシにより、ソースとターゲットのクラスターに対する接続を確立するためにその資格情報が使用されます。 前に説明したように、このプロセスでは、ソースとターゲットの資格情報が同じであることを前提としています。 必要な場合は、プロキシを開始するときに、ターゲット Cassandra エンドポイントに異なるユーザー名とパスワードを指定することができます。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

指定されていない場合、ソースとターゲットの既定のポートは 9042 になります。 ターゲットまたはソースの Cassandra エンドポイントが異なるポートで実行されている場合は、--source-port または --target-port を使用して別のポート番号を指定できます。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

プロキシをリモートでデプロイする

クラスター ノード自体にプロキシをインストールせず、別のマシンにインストールしたい場合があります。 そのようなシナリオでは、<source-server> の IP アドレスを指定する必要があります。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

警告

プロキシを (ソース Apache Cassandra クラスター内のすべてのノードで実行するのではなく) 別のマシンでリモートで実行する場合は、クラスター内のノードと同じ数のマシンにプロキシをデプロイし、こちらで説明するプロキシの構成を使用して system.peers でそれらの IP アドレスの代替を設定することをお勧めします。 これを行わないと、クライアント ドライバーで、クラスター内のすべてのノードに対する接続を開くことができないため、ライブ マイグレーションの実行中にパフォーマンスに影響が生じる可能性があります。

アプリケーションのコードを変更しなくてよいようにする

既定では、プロキシにより、ポート 29042 がリッスンされます。 このポートを指すように、アプリケーションのコードを変更する必要があります。 ただし、プロキシによりリッスンされるポートを変更することができます。 次の方法によりアプリケーション レベルでコードを変更しなくて済むようにしたい場合、これを行うことがあります。

  • ソース Cassandra サーバーを別のポートで実行する。
  • プロキシを Cassandra の標準ポート 9042 で実行する。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

Note

クラスター ノードにプロキシをインストールした場合、ノードを再起動する必要はありません。 ただし、多数のアプリケーション クライアントがあり、アプリケーション レベルのコード変更を不要にするために、プロキシを Cassandra の標準ポート 9042 で実行したい場合は、Apache Cassandra の既定のポートを変更する必要があります。 その後、クラスター内のノードを再起動し、ソース Cassandra クラスターに対して定義した新しいポートに、ソース ポートを構成する必要があります。

次の例では、ポート 3074 で実行するようにソース Cassandra クラスターを変更し、ポート 9042 でクラスターを起動します。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042 --source-port 3074

プロトコルを強制する

このプロキシにはプロトコルを強制する機能があります。これは、ソース エンドポイントがターゲットよりも高度な場合、またはそうしないとサポートされない場合に、必要になることがあります。 その場合は、--protocol-version--cql-version を指定して、プロトコルをターゲットに準拠させることができます。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --protocol-version 4 --cql-version 3.11

デュアル書き込みプロキシが実行されるようになった後、アプリケーション クライアントでポートを変更し、再起動する必要があります。 (または、その方法を選択した場合は、Cassandra のポートを変更してクラスターを再起動します)。その後、プロキシによるターゲット エンドポイントへの書き込みの転送が開始されます。 プロキシ ツールで使用できる監視とメトリックについてはこちらを参照してください。

履歴データの読み込みを実行する

データを読み込むには、Azure Databricks アカウントで Scala ノートブックを作成します。 ソースとターゲットの Cassandra 構成を対応する資格情報に置き換え、ソースとターゲットのキースペースとテーブルを置き換えます。 次のサンプルのように、必要に応じて各テーブルに変数を追加し、実行します。 アプリケーションによってデュアル書き込みプロキシへの要求送信が開始されたら、履歴データを移行する準備は完了です。

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

Note

前の Scala サンプルを見ると、ソース テーブル内のすべてのデータが読み取られる前に、timestamp が現在の時刻に設定されていることがわかります。 その後、writetime がこの過去のタイムスタンプに設定されます。 これは、履歴データが読み込まれている間に、履歴データの読み込みからターゲット エンドポイントに書き込まれたレコードによって、後のタイム スタンプでデュアル書き込みプロキシから送られた更新が上書きされないようにするためです。

重要

何らかの理由で "正確な" タイムスタンプを保持する必要がある場合は、こちらのサンプルのような、タイムスタンプを保持する履歴データの移行アプローチを採用する必要があります。 サンプルの依存関係 jar には Spark コネクタも含まれているので、前の前提条件で説明した Spark コネクタ アセンブリをインストールする必要はありません。Spark クラスターに両方ともインストールすると競合が発生します。

ソースとターゲットを検証する

履歴データの読み込みが完了すると、データベースは同期され、切り替えの準備が整います。 ただし、最終的なカットオーバーの前に、ソースとターゲットを検証してそれらが一致することを確認することをお勧めします。

注意

writetime を保持するために上記の cassandra migrationor サンプルを使用した場合、このサンプルには、特定の許容値に基づいてソースとターゲットの行を比較して移行を検証する機能が含まれています。

次のステップ