你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用双重写入代理实时迁移到 Azure Managed Instance for Apache Cassandra

如果可能,我们建议使用 Apache Cassandra 原生功能,通过配置混合群集,将数据从现有群集迁移到 Azure Apache Cassandra 托管实例。 此功能使用 Apache Cassandra 的 gossip 协议以无缝方式将源数据中心的数据复制到新的托管实例数据中心。

在某些情况下,源数据库版本不兼容,或者混合群集设置不可行。 本教程介绍如何使用双重写入代理和 Apache Spark 将数据实时迁移到 Azure Managed Instance for Apache Cassandra。 双重写入代理用于捕获实时更改,而历史数据是使用 Apache Spark 批量复制的。 此方法的优点包括:

  • 尽量减少对应用程序做出更改。 只需进行少量的配置更改甚至不进行任何配置更改,代理就能接受来自应用程序代码的连接。 它将所有请求路由到源数据库,并以异步方式将写入路由到辅助目标。
  • 客户端线路协议依赖性。 由于此方法不依赖于后端资源或内部协议,因此可与实现 Apache Cassandra 线路协议的任何源或目标 Cassandra 系统一起使用。

下图演示了该方法。

演示如何将数据实时迁移到 Azure Managed Instance for Apache Cassandra 的动画。

先决条件

预配 Spark 群集

建议选择支持 Spark 3.0 的 Azure Databricks 运行时版本 7.5。

显示如何查找 Azure Databricks 运行时版本的屏幕截图。

添加 Spark 依赖项

你需要将 Apache Spark Cassandra 连接器库添加到群集,以便连接到任何与线路协议兼容的 Apache Cassandra 终结点。 在群集中,选择“库”“安装新库”>“Maven”,然后在 Maven 坐标中添加

重要

如果需要在迁移期间保留每一行的 Apache Cassandra writetime,建议使用此示例。 此示例中的依赖项 jar 还包含 Spark 连接器,因此请安装此版本而不是连接器程序集。

如果要在历史数据加载完成后在源和目标之间执行行比较验证,此示例也很有用。 请参阅 运行历史数据加载验证源和目标

显示如何在 Azure Databricks 中搜索 Maven 包的屏幕截图。

选择“安装”,然后在安装完成后重启群集。

注意

安装 Cassandra 连接器库后,请务必重启 Azure Databricks 群集。

安装双重写入代理

为了在双重写入期间获得最佳性能,我们建议在源 Cassandra 群集中的所有节点上安装代理。

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

启动双重写入代理

建议在源 Cassandra 群集中的所有节点上安装该代理。 至少,请运行以下命令在每个节点上启动代理。 请将 <target-server> 替换为目标群集中某个节点上的 IP 地址或服务器地址。 将 <path to JKS file> 替换为本地 jks 文件的路径,并将 <keystore password> 替换为相应的密码。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

以这种方式启动代理的前提是满足以下条件:

  • 源和目标终结点具有相同的用户名和密码。
  • 源和目标终结点实现安全套接字层 (SSL)。

如果源和目标终结点无法满足这些条件,请继续阅读以了解其他配置选项。

配置 SSL

对于 SSL,可以实现现有的密钥存储,例如源群集使用的密钥存储,或者使用 keytool以下命令创建自签名证书:

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

如果源或目标终结点不实现 SSL,则你还可为其禁用 SSL。 使用 --disable-source-tls--disable-target-tls 标志:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password> \
  --disable-source-tls true  --disable-target-tls true 

注意

在为使用代理的数据库建立 SSL 连接时,请确保您的客户端应用程序使用与双写代理相同的密钥存储和密码。

配置凭据和端口

默认情况下,源凭据通过客户端应用传递。 代理使用凭据来建立与源群集和目标群集的连接。 如前所述,此过程假设源和目标凭据相同。 如有必要,可以在启动代理时单独为目标 Cassandra 终结点指定不同的用户名和密码:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> \
  --target-username <username> --target-password <password>

未指定的默认源和目标端口为 9042。 如果目标或源 Cassandra 终结点在不同的端口上运行,你可以使用 --source-port--target-port 指定不同的端口号:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

远程部署代理

在某些情况下,你不希望在群集节点上自行安装代理。 你想要在一台单独的计算机上安装它。 在这种情况下,请指定以下 IP <source-server>地址:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

警告

你可能想要在单独的计算机上远程运行代理,而不是在源 Apache Cassandra 群集中的所有节点上运行代理。 如果是这样,我们建议将代理部署到与群集节点数量相同数量的计算机上。 在 system.peers 中为其 IP 地址设置替换。 在代理中使用此配置。 如果不使用此方法,则它可能会影响实时迁移时的性能,因为客户端驱动程序无法打开与群集中所有节点的连接。

实现零应用程序代码更改

默认情况下,代理侦听端口 29042。 必须将应用程序代码更改为指向此端口。 或者,可以更改代理侦听的端口。 如果要通过以下方法消除应用程序级代码更改,可以使用此方法:

  • 让源 Cassandra 服务器在其他端口上运行。
  • 让代理在标准 Cassandra 端口 9042 上运行。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

注意

在群集节点上安装代理不需要重启节点。 如果有许多应用程序客户端,并且更喜欢在标准 Cassandra 端口 9042 上运行代理以消除任何应用程序级代码更改,请更改 Apache Cassandra 默认端口。 然后需要重启群集中的节点,并将源端口配置为你已为源 Cassandra 群集定义的新端口。

在以下示例中,我们将源 Cassandra 群集更改为在端口 3074 上运行,并在端口 9042 上启动群集:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042 --source-port 3074

强制实施协议

代理提供强制实施协议的功能。如果源终结点比目标更高级或者不受支持,可能需要强制实施协议。 在这种情况下,可以指定 --protocol-version--cql-version 来强制实施协议,以便与目标相符:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --protocol-version 4 --cql-version 3.11

运行双写入代理后,更改应用程序客户端上的端口并重启。 或者更改 Cassandra 端口,如果选择了该方法,请重启群集。 代理开始将写入转发到目标终结点。 你可以了解代理工具中提供的监视和指标

运行历史数据加载

若要加载数据,请在 Azure Databricks 帐户中创建一个 Scala 笔记本。 将源和目标 Cassandra 配置替换为相应的凭据,并替换源和目标密钥空间和表。 根据需要在以下示例中为每个表添加更多变量,然后运行该示例。 在应用程序开始向双重写入代理发送请求后,你便可以迁移历史数据。

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

注意

在前面的 Scala 示例中, timestamp 在读取源表中的所有数据之前,将设置为当前时间。 然后 writetime 设置为此回溯时间戳。 此方法可以确保通过加载历史数据而写入到目标终结点的记录,无法覆盖在读取历史数据时从双重写入代理传入的具有更迟时间戳的更新内容。

重要

如果你出于任何原因需要保留确切的时间戳,应采用可保留时间戳的历史数据迁移方法,如此示例所示。 示例中的依赖项 jar 还包含 Spark 连接器,因此无需安装前面先决条件中提到的 Spark 连接器程序集。 在 Spark 群集中安装这两个组件会导致冲突。

验证源和目标

历史数据加载完成后,数据库应已同步并准备好进行直接转换。 我们建议验证源和目标以确保它们匹配,然后再最终进行直接转换。

注意

如果使用了前面部分中提到的 Cassandra 迁移程序示例用于保留writetime,则可以通过根据某些容差比较源和目标中的,来验证迁移

后续步骤