Доступ к Azure Cosmos DB для Apache Cassandra из Spark в YARN с помощью HDInsight
Область применения: Кассандра
В этой статье описывается, как получить доступ к Azure Cosmos DB для Apache Cassandra из Spark в YARN с помощью HDInsight-Spark.spark-shell
HDInsight — это платформа Microsoft Hortonworks Hadoop PaaS в Azure. Она использует хранилище объектов для HDFS и поставляется в нескольких вариантах, включая Spark. Хотя эта статья относится к HDInsight-Spark, она применяется ко всем дистрибутивам Hadoop.
Необходимые компоненты
Перед началом работы ознакомьтесь с основами подключения к Azure Cosmos DB для Apache Cassandra.
Вам потребуется сделать следующее:
Подготовка Azure Cosmos DB для Apache Cassandra. См. Создание учетной записи базы данных.
Подготовка кластера HDInsight Spark. См. Создание кластера Apache Spark в Azure HDInsight с помощью шаблона ARM.
API для конфигурации Cassandra в Spark2. Для инициализации соединителя Spark для Cassandra как части контекста Spark необходимы сведения о подключении Cassandra. При запуске записной книжки Jupyter сеанс и контекст Spark уже инициализируются. Не останавливайтесь и повторно инициализируйте контекст Spark, если только в него не включен каждый набор конфигураций как часть запуска записной книжки HDInsight Jupyter по умолчанию. Одним из способов обхода является добавление сведений об экземпляре Cassandra непосредственно в конфигурацию службы Ambari, Spark2. Это требуется выполнить один раз для каждого кластера, на котором нужно перезапустить службу Spark2.
Перейдите к службе Ambari, Spark2 и выберите конфигурации.
Перейдите к пользовательской конфигурации spark2-defaults и добавьте новое свойство с приведенным ниже значением, после чего перезапустите службу Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Для проверки можно использовать cqlsh
. Дополнительные сведения см. в статье "Подключение к Azure Cosmos DB для Apache Cassandra" из Spark.
Доступ к Azure Cosmos DB для Apache Cassandra из оболочки Spark
Оболочка Spark используется для тестирования и исследования.
Запустите
spark-shell
с обязательными зависимостями Maven, совместимыми с версией Spark кластера.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Выполните несколько операций DDL и DML.
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Выполните операции CRUD.
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Доступ к Azure Cosmos DB для Apache Cassandra из записных книжек Jupyter
HDInsight Spark включает в себя службы Zeppelin и Jupyter Notebook. Это среды для записных книжек с доступом через Интернет, которые поддерживают Scala и Python. Записные книжки отлично подходят для интерактивной исследовательской аналитики и совместной работы, но не предназначены для оперативных или производственных процессов.
Следующие записные книжки Jupyter можно передать в кластер HDInsight Spark и предоставить готовые примеры для работы с Azure Cosmos DB для Apache Cassandra. Обязательно просмотрите первую записную книжку 1.0-ReadMe.ipynb
, чтобы просмотреть конфигурацию службы Spark для подключения к Azure Cosmos DB для Apache Cassandra.
Скачайте эти записные книжки из раздела azure-cosmos-db-cassandra-api-spark-notebooks-jupyter на компьютер.
Передача
При запуске Jupyter перейдите к Scala. Сначала создайте каталог, а затем передайте в него записные книжки. Кнопка Upload (Передать) находится вверху справа.
Запуск
Последовательно запустите записные книжки и их ячейки. В верхней части каждой записной книжки нажмите Run (Запустить), чтобы выполнить все ячейки, или нажмите Shift+Enter для каждой ячейки.
Доступ с помощью Azure Cosmos DB для Apache Cassandra из программы Spark Scala
Для автоматизации процессов в рабочей среде программы Spark отправляются в кластер с помощью spark-submit.