Доступ к Azure Cosmos DB для Apache Cassandra из Spark в YARN с помощью HDInsight

ПРИМЕНИМО К: Кассандра

В этой статье описывается, как получить доступ к Azure Cosmos DB для Apache Cassandra из Spark в YARN с помощью HDInsight-Spark из spark-shell. HDInsight — это платформа Microsoft Hortonworks Hadoop PaaS в Azure. Она использует хранилище объектов для HDFS и поставляется в нескольких вариантах, включая Spark. Хотя эта статья относится к HDInsight-Spark, она применяется ко всем дистрибутивам Hadoop.

Предварительные требования

Прежде чем начать, ознакомьтесь с основами подключения к Azure Cosmos DB для Apache Cassandra.

Вам потребуется сделать следующее:

  • Подготовка Azure Cosmos DB для Apache Cassandra. См. Создание учетной записи базы данных.

  • Подготовка кластера HDInsight Spark. См. Создание кластера Apache Spark в Azure HDInsight с помощью шаблона ARM.

  • API для конфигурации Cassandra в Spark2. Для инициализации соединителя Spark для Cassandra как части контекста Spark необходимы сведения о подключении Cassandra. При запуске записной книжки Jupyter сеанс и контекст Spark уже инициализируются. Не останавливайтесь и повторно инициализируйте контекст Spark, если только в него не включен каждый набор конфигураций как часть запуска записной книжки HDInsight Jupyter по умолчанию. Одним из способов обхода является добавление сведений об экземпляре Cassandra непосредственно в конфигурацию службы Ambari, Spark2. Это требуется выполнить один раз для каждого кластера, на котором нужно перезапустить службу Spark2.

    1. Перейдите к службе Ambari, Spark2 и выберите конфигурации.

    2. Перейдите к пользовательской конфигурации spark2-defaults и добавьте новое свойство с приведенным ниже значением, после чего перезапустите службу Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Для проверки можно использовать cqlsh. Дополнительные сведения см. в статье Подключение к Azure Cosmos DB для Apache Cassandra из Spark.

Доступ к Azure Cosmos DB для Apache Cassandra из оболочки Spark

Оболочка Spark используется для тестирования и исследования.

  • Запустите spark-shell с обязательными зависимостями Maven, совместимыми с версией Spark кластера.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Выполните несколько операций DDL и DML.

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Выполните операции CRUD.

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Доступ к Azure Cosmos DB для Apache Cassandra из записных книжек Jupyter

HDInsight Spark включает в себя службы Zeppelin и Jupyter Notebook. Это среды для записных книжек с доступом через Интернет, которые поддерживают Scala и Python. Записные книжки отлично подходят для интерактивной исследовательской аналитики и совместной работы, но не предназначены для оперативных или производственных процессов.

Следующие записные книжки Jupyter можно отправить в кластер HDInsight Spark и предоставить готовые примеры для работы с Azure Cosmos DB для Apache Cassandra. Обязательно просмотрите первую записную книжку 1.0-ReadMe.ipynb , чтобы просмотреть конфигурацию службы Spark для подключения к Azure Cosmos DB для Apache Cassandra.

Скачайте эти записные книжки из раздела azure-cosmos-db-cassandra-api-spark-notebooks-jupyter на компьютер.

Передача

При запуске Jupyter перейдите к Scala. Сначала создайте каталог, а затем передайте в него записные книжки. Кнопка Upload (Передать) находится вверху справа.

Запуск

Последовательно запустите записные книжки и их ячейки. В верхней части каждой записной книжки нажмите Run (Запустить), чтобы выполнить все ячейки, или нажмите Shift+Enter для каждой ячейки.

Доступ с помощью Azure Cosmos DB для Apache Cassandra из программы Spark Scala

Для автоматизации процессов в рабочей среде программы Spark отправляются в кластер с помощью spark-submit.

Дальнейшие действия