HDInsight ile YARN üzerinde Spark'tan Apache Cassandra için Azure Cosmos DB'ye erişme
ŞUNLAR IÇIN GEÇERLIDIR: Cassandra
Bu makale, HDInsight-Spark ile YARN üzerinde Spark'tan Apache Cassandra için Azure Cosmos DB'ye spark-shell
erişmeyi kapsar. HDInsight, Microsoft'un Azure'da Hortonworks Hadoop PaaS'ıdır. HDFS için nesne depolama kullanır ve Spark dahil olmak üzere çeşitli çeşitlerde gelir. Bu makale HDInsight-Spark'a başvururken, tüm Hadoop dağıtımları için geçerlidir.
Önkoşullar
Başlamadan önce Apache Cassandra için Azure Cosmos DB'ye bağlanmanın temellerini gözden geçirin.
Aşağıdaki önkoşullara ihtiyacınız vardır:
Apache Cassandra için Azure Cosmos DB sağlama. Bkz . Veritabanı hesabı oluşturma.
HDInsight-Spark kümesi sağlama. Bkz. ARM şablonunu kullanarak Azure HDInsight'ta Apache Spark kümesi oluşturma.
Spark2'de Cassandra yapılandırması için API. Cassandra için Spark bağlayıcısı, Cassandra bağlantı ayrıntılarının Spark bağlamının bir parçası olarak başlatılmasını gerektirir. Jupyter not defterini başlattığınızda Spark oturumu ve bağlamı zaten başlatılır. HDInsight varsayılan Jupyter not defteri başlatma işleminin bir parçası olarak her yapılandırma kümesiyle tamamlanmadığı sürece Spark bağlamını durdurmayı ve yeniden başlatmayın. Geçici çözümlerden biri Cassandra örneği ayrıntılarını doğrudan Ambari,Spark2 hizmet yapılandırmasına eklemektir. Bu yaklaşım, Spark2 hizmetinin yeniden başlatılmasını gerektiren küme başına tek seferlik bir etkinliktir.
Ambari, Spark2 hizmetine gidin ve yapılandırmalar'ı seçin.
Özel spark2-defaults'a gidin ve aşağıdakilerle yeni bir özellik ekleyin ve Spark2 hizmetini yeniden başlatın:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Doğrulama için kullanabilirsiniz cqlsh
. Daha fazla bilgi için bkz . Spark'tan Apache Cassandra için Azure Cosmos DB'ye bağlanma.
Spark kabuğundan Apache Cassandra için Azure Cosmos DB'ye erişme
Spark kabuğu test ve araştırma için kullanılır.
Kümenizin Spark sürümüyle uyumlu gerekli maven bağımlılıklarıyla başlatın
spark-shell
.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Bazı DDL ve DML işlemlerini yürütme
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
CRUD işlemlerini çalıştırma
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Jupyter not defterlerinden Apache Cassandra için Azure Cosmos DB'ye erişme
HDInsight-Spark, Zeppelin ve Jupyter not defteri hizmetleriyle birlikte gelir. Her ikisi de Scala ve Python'ı destekleyen web tabanlı not defteri ortamlarıdır. Not defterleri etkileşimli keşif analizi ve işbirliği için mükemmeldir, ancak operasyonel veya üretim süreçleri için değildir.
Aşağıdaki Jupyter not defterleri HDInsight Spark kümenize yüklenebilir ve Apache Cassandra için Azure Cosmos DB ile çalışmaya hazır örnekler sağlayabilir. Apache Cassandra için Azure Cosmos DB'ye bağlanmaya yönelik Spark hizmeti yapılandırmasını gözden geçirmek için ilk not defterini 1.0-ReadMe.ipynb
gözden geçirmeyi unutmayın.
azure-cosmos-db-cassandra-api-spark-notebooks-jupyter altındaki not defterlerini makinenize indirin.
Karşıya yükleme
Jupyter'ı başlattığınızda Scala'ya gidin. Bir dizin oluşturun ve ardından not defterlerini dizine yükleyin. Karşıya Yükle düğmesi sağ üst taraftadır.
Nasıl çalıştırılır
Not defterlerini ve her not defteri hücresini sıralı olarak gözden geçirebilirsiniz. Tüm hücreleri çalıştırmak için her not defterinin üst kısmındaki Çalıştır düğmesini veya her hücre için Shift Enter'ı+ seçin.
Spark Scala programınızdan Apache Cassandra için Azure Cosmos DB ile erişim
Üretimdeki otomatik işlemler için Spark programları spark-submit kullanılarak kümeye gönderilir.