HDInsight ile YARN üzerinde Spark'tan Apache Cassandra için Azure Cosmos DB'ye erişme

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Bu makale, 'den HDInsight-Spark ile YARN üzerinde Spark'tan Apache Cassandra için Azure Cosmos DB'ye spark-shellerişmeyi kapsar. HDInsight, Microsoft'un Azure'da Hortonworks Hadoop PaaS'ıdır. HDFS için nesne depolama kullanır ve Spark da dahil olmak üzere çeşitli özelliklere sahiptir. Bu makale HDInsight-Spark'ı ifade ederken, tüm Hadoop dağıtımları için geçerlidir.

Önkoşullar

Başlamadan önce Apache Cassandra için Azure Cosmos DB'ye bağlanmanın temellerini gözden geçirin.

Aşağıdaki önkoşullara ihtiyacınız vardır:

  • Apache Cassandra için Azure Cosmos DB sağlama. Bkz. Veritabanı hesabı oluşturma.

  • HDInsight-Spark kümesi sağlama. Bkz. ARM şablonunu kullanarak Azure HDInsight'ta Apache Spark kümesi oluşturma.

  • Spark2'de Cassandra yapılandırması için API. Cassandra için Spark bağlayıcısı, Cassandra bağlantı ayrıntılarının Spark bağlamının bir parçası olarak başlatılmasını gerektirir. Jupyter not defterini başlattığınızda Spark oturumu ve bağlamı zaten başlatılır. HDInsight varsayılan Jupyter not defteri başlatma işleminin bir parçası olarak her yapılandırma kümesiyle tamamlanmadığı sürece Spark bağlamını durdurmayı ve yeniden başlatmayınız. Geçici çözümlerden biri, Cassandra örneği ayrıntılarını doğrudan Spark2 hizmet yapılandırması olan Ambari'ye eklemektir. Bu yaklaşım, Spark2 hizmetinin yeniden başlatılmasını gerektiren küme başına tek seferlik bir etkinliktir.

    1. Ambari, Spark2 hizmetine gidin ve yapılandırmalar'ı seçin.

    2. Özel spark2-defaults'a gidin ve aşağıdakilerle yeni bir özellik ekleyin ve Spark2 hizmetini yeniden başlatın:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Doğrulama için kullanabilirsiniz cqlsh . Daha fazla bilgi için bkz. Spark'tan Apache Cassandra için Azure Cosmos DB'ye bağlanma.

Spark kabuğundan Apache Cassandra için Azure Cosmos DB'ye erişme

Spark kabuğu test ve araştırma için kullanılır.

  • Kümenizin Spark sürümüyle uyumlu gerekli maven bağımlılıklarıyla başlatın spark-shell .

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Bazı DDL ve DML işlemlerini yürütme

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • CRUD işlemlerini çalıştırma

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Jupyter not defterlerinden Apache Cassandra için Azure Cosmos DB'ye erişme

HDInsight-Spark, Zeppelin ve Jupyter notebook hizmetleriyle birlikte gelir. Her ikisi de Scala ve Python'ı destekleyen web tabanlı not defteri ortamlarıdır. Not defterleri etkileşimli keşif analizi ve işbirliği için harikadır ancak operasyonel veya üretim süreçleri için uygun değildir.

Aşağıdaki Jupyter not defterleri HDInsight Spark kümenize yüklenebilir ve Apache Cassandra için Azure Cosmos DB ile çalışmaya hazır örnekler sağlayabilir. Apache Cassandra için Azure Cosmos DB'ye bağlanmaya yönelik Spark hizmeti yapılandırmasını gözden geçirmek için ilk not defterini 1.0-ReadMe.ipynb gözden geçirmeyi unutmayın.

azure-cosmos-db-cassandra-api-spark-notebooks-jupyter altındaki not defterlerini makinenize indirin.

Karşıya yükleme

Jupyter'ı başlattığınızda Scala'ya gidin. Bir dizin oluşturun ve ardından not defterlerini dizine yükleyin. Karşıya Yükle düğmesi en üstte, sağ taraftadır.

Nasıl çalıştırılır?

Not defterlerini ve her not defteri hücresini sırayla gözden geçirebilirsiniz. Tüm hücreleri çalıştırmak için her not defterinin üst kısmındaki Çalıştır düğmesini veya her hücre için ShiftEnter'ı+ seçin.

Spark Scala programınızdan Apache Cassandra için Azure Cosmos DB ile erişim

Üretimdeki otomatik işlemler için Spark programları spark-submit kullanılarak kümeye gönderilir.

Sonraki adımlar