Akses Azure Cosmos DB untuk Apache Cassandra dari Spark di YARN dengan HDInsight

BERLAKU UNTUK: Cassandra

Artikel ini membahas cara mengakses Azure Cosmos DB untuk Apache Cassandra dari Spark di YARN dengan HDInsight-Spark dari spark-shell. HDInsight adalah Hortonworks Hadoop PaaS Microsoft di Azure. Ia menggunakan penyimpanan objek untuk HDFS dan hadir dalam beberapa selera, termasuk Spark. Meskipun mengacu pada HDInsight-Spark, artikel ini berlaku untuk semua distribusi Hadoop.

Prasyarat

Sebelum memulai, tinjau dasar-dasar menyambungkan ke Azure Cosmos DB untuk Apache Cassandra.

Anda memerlukan prasyarat berikut:

  • Provisikan Azure Cosmos DB untuk Apache Cassandra. Lihat Buat akun database.

  • Memprovisikan kluster HDInsight-Spark. Lihat Mulai Cepat: Membuat kluster Apache Spark di Azure HDInsight menggunakan templat ARM.

  • API untuk konfigurasi Cassandra di Spark2. Konektor Spark untuk Cassandra memerlukan detail koneksi Cassandra untuk diinisialisasi sebagai bagian dari konteks Spark. Saat Anda meluncurkan notebook Jupyter, sesi dan konteks spark sudah diinisialisasi. Jangan menghentikan dan menginisialisasi ulang konteks Spark kecuali jika sudah selesai dengan setiap konfigurasi yang ditetapkan sebagai bagian dari pengaktifan notebook Jupyter default HDInsight. Salah satu solusinya adalah menambahkan secara langsung detail instans Cassandra ke Ambari, konfigurasi layanan Spark2. Pendekatan ini adalah aktivitas satu kali per kluster yang memerlukan layanan Spark2 dihidupkan ulang.

    1. Buka layanan Ambari, Spark2, dan pilih konfigurasi.

    2. Buka spark2-default kustom dan tambahkan properti baru dengan yang berikut ini, lalu mulai ulang layanan Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Anda dapat menggunakan cqlsh untuk validasi. Untuk informasi selengkapnya, lihat Menyambungkan ke Azure Cosmos DB untuk Apache Cassandra dari Spark.

Akses Azure Cosmos DB untuk Apache Cassandra dari shell Spark

Spark shell digunakan untuk pengujian dan eksplorasi.

  • Luncurkan spark-shell dengan dependensi maven yang diperlukan yang kompatibel dengan versi Spark kluster Anda.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Menjalankan beberapa operasi DDL dan DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Menjalankan operasi CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Mengakses Azure Cosmos DB untuk Apache Cassandra dari notebook Jupyter

HDInsight-Spark dilengkapi dengan layanan notebook Zeppelin dan Jupyter. Keduanya merupakan lingkungan notebook berbasis web yang mendukung Scala dan Python. Notebook sangat bagus untuk analitik dan kolaborasi eksploratif interaktif, tetapi tidak dimaksudkan untuk proses operasional atau produksi.

Notebook Jupyter berikut dapat diunggah ke kluster HDInsight Spark Anda dan menyediakan sampel siap untuk bekerja dengan Azure Cosmos DB untuk Apache Cassandra. Pastikan untuk meninjau notebook 1.0-ReadMe.ipynb pertama yang meninjau konfigurasi layanan Spark untuk menyambungkan ke Azure Cosmos DB untuk Apache Cassandra.

Unduh notebook ini di bawah azure-cosmos-db-cassandra-api-spark-notebooks-jupyter ke komputer Anda.

Cara mengunggah

Saat Anda meluncurkan Jupyter, arahkan ke Scala. Buat direktori lalu unggah notebook ke direktori tersebut. Tombol Unggah ada di bagian atas, di sisi kanan.

Cara menjalankan

Buka notebook, dan setiap sel notebook secara berurutan. Pilih tombol Jalankan di bagian atas setiap notebook untuk menjalankan semua sel, atau tekan Shift+Enteruntuk setiap sel.

Akses dengan Azure Cosmos DB for Apache Cassandra dari program Spark Scala Anda

Untuk proses otomatis dalam produksi, program Spark diserahkan ke kluster dengan menggunakan spark-submit.

Langkah berikutnya