Перенос данных из Cassandra в учетную запись Azure Cosmos DB для Apache Cassandra с помощью Azure Databricks

ПРИМЕНИМО К: Кассандра

API для Cassandra в Azure Cosmos DB стал отличным выбором для корпоративных рабочих нагрузок, работающих в Apache Cassandra, по нескольким причинам:

  • Отсутствие дополнительных издержек при управлении и мониторинге: Она устраняет издержки на управление параметров в файлах ОС, виртуальной машины Java и YAML и их взаимодействиях.

  • Значительные сокращения затрат. Вы можете уменьшить расходы с помощью Azure Cosmos DB, включая стоимость виртуальных машин, пропускной способности и любых применимых лицензий. Вам не нужно управлять центрами обработки данных, серверами, SSD-накопителями, сетями и расходами на электроэнергию.

  • Возможность использования существующего кода и средств. Azure Cosmos DB предоставляет совместимость на уровне сетевого протокола с имеющимися пакетами SDK и средствами для Cassandra. Такая совместимость гарантирует, что вы сможете использовать существующую базу кода с Azure Cosmos DB для Apache Cassandra с тривиальными изменениями.

Существует множество способов переноса рабочих нагрузок базы данных с одной платформы на другую. Azure Databricks является предложением платформы как услуги (PaaS) для Apache Spark, которое позволяет выполнять автономную миграцию в больших масштабах. В этой статье описаны шаги, необходимые для переноса данных из собственных пространств ключей и таблиц Apache Cassandra в Azure Cosmos DB для Apache Cassandra с помощью Azure Databricks.

Предварительные требования

Предоставление кластера Azure Databricks

Вы можете выполнить инструкции по подготовке кластера Azure Databricks. Мы рекомендуем выбрать среду выполнения Databricks версии 7.5, которая поддерживает Spark 3.0.

Снимок экрана, на котором показано, как найти версию среды выполнения Databricks.

Добавление зависимостей

Необходимо добавить библиотеку соединителей Apache Spark Cassandra в кластер для подключения к собственным конечным точкам, а также к конечным точкам Cassandra в Azure Cosmos DB. В кластере выберите Библиотеки>Установить>Maven, а затем добавьте com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 в координаты Maven.

Снимок экрана, на котором показано поиск пакетов Maven в Databricks.

Нажмите кнопку Установить, а затем перезапустите кластер после завершения установки.

Примечание

Убедитесь, что кластер Databricks перезапускается после установки библиотеки соединителей Cassandra.

Предупреждение

Представленные в этой статье примеры протестированы на Spark версии 3.0.1 с соответствующим соединителем Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0. Более поздние версии Spark и (или) соединителя Cassandra могут работать непредсказуемым образом.

Создание записной книжки Scala для миграции

Создайте записную книжку Scala в Databricks. Замените исходные и целевые конфигурации Cassandra соответствующими учетными данными, а также исходными и целевыми пространствами ключей и таблицами. Затем выполните следующий код.

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val nativeCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val cosmosCassandra = Map( 
    "spark.cassandra.connection.host" -> "<USERNAME>.cassandra.cosmos.azure.com",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    //"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1", // Spark 3.x
    "spark.cassandra.connection.connections_per_executor_max"-> "1", // Spark 2.x
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from native Cassandra
val DFfromNativeCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(nativeCassandra)
  .load
  
//Write to CosmosCassandra
DFfromNativeCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(cosmosCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Примечание

Значения spark.cassandra.output.batch.size.rows и spark.cassandra.output.concurrent.writes, а также количество рабочих ролей в кластере Spark являются важными параметрами, которые необходимо настроить, чтобы избежать ограничения скорости. Ограничение скорости происходит, когда запросы к Azure Cosmos DB превышают установленную пропускную способность или единицы запросов (ЕЗ). Вам может понадобиться настроить эти параметры в зависимости от количества исполнителей в кластере Spark и, возможно, размера (а, следовательно, и стоимости ЕЗ) каждой записи, записываемой в целевые таблицы.

Диагностика

Ограничение скорости (ошибка 429)

Может появиться сообщение с кодом ошибки 429 или текстом ошибки "Частота запросов слишком велика", даже если вы уменьшили настройки до минимальных значений. Следующие сценарии могут привести к ограничению скорости:

  • Пропускная способность, выделенная таблице, составляет менее 6 000 единиц запросов . Даже при минимальных настройках Spark может писать со скоростью около 6 000 единиц запроса или более. Если вы подготовили таблицу в пространстве ключей с общей пропускной способностью, возможно, что в этой таблице доступно менее 6 000 единиц запроса.

    При выполнении миграции убедитесь, что в таблице, в которую выполняется миграция, имеется по крайней мере 6 000 единиц запроса. При необходимости предназначьте выделенные единицы запроса этой таблице.

  • Неравномерное распределение данных при большом объеме данных. Если у вас есть большой объем данных для миграции в определенную таблицу, однако в данных есть значительно неравномерное отклонение (то есть большое количество записей, записываемых для одного и того же значения ключа секции), вы все равно можете столкнуться с ограничением скорости, даже если в таблице предусмотрено несколько единиц запросов. Единицы запросов делятся между физическими секциями, а чрезмерно неравномерное распределение данных может привести к узким местам запросов к одной секции.

    В этом сценарии уменьшите настройки минимальной пропускной способности в Spark и заставьте миграцию выполняться медленно. Этот сценарий может оказаться более распространенным при переносе ссылок или таблиц элементов управления, где доступ осуществляется реже и неравномерное распределение может быть большим. Однако если в других типах таблиц имеется значительное неравномерное распределение, может возникнуть необходимость изучить модель данных, чтобы избежать проблем с "горячим" разделением рабочей нагрузки во время стабильных операций.

Дальнейшие действия