Az Apache Cassandrához készült Azure Cosmos DB elérése a Spark on YARN-ból a HDInsighttal

A KÖVETKEZŐKRE VONATKOZIK: Cassandra

Ez a cikk bemutatja, hogyan érheti el az Apache Cassandra-hoz készült Azure Cosmos DB-t a Spark on YARN-ból a HDInsight-Spark a-ból spark-shell. A HDInsight a Microsoft Hortonworks Hadoop PaaS szolgáltatása az Azure-ban. Objektumtárolót használ a HDFS-hez, és számos változatban kapható, beleértve a Sparkot is. Bár ez a cikk a HDInsight-Sparkra vonatkozik, az összes Hadoop-disztribúcióra vonatkozik.

Előfeltételek

Mielőtt hozzákezdene, tekintse át az Apache Cassandrához készült Azure Cosmos DB-hez való csatlakozás alapjait.

A következő előfeltételekre van szüksége:

  • Az Azure Cosmos DB kiépítése az Apache Cassandra számára. Lásd: Adatbázisfiók létrehozása.

  • HDInsight-Spark-fürt kiépítése. Lásd: Apache Spark-fürt létrehozása az Azure HDInsightban ARM-sablonnal.

  • A Cassandra konfigurációs API-ja a Spark2-ben. A Cassandra Spark-összekötőjének inicializálnia kell a Cassandra-kapcsolat részleteit a Spark-környezet részeként. Jupyter-notebook indításakor a Spark-munkamenet és a környezet már inicializálva lesz. Ne állítsa le és inicializálja újra a Spark-környezetet, kivéve, ha minden konfiguráció be van állítva a HDInsight alapértelmezett Jupyter-notebook indítási folyamatának részeként. Az egyik megkerülő megoldás a Cassandra-példány részleteinek hozzáadása az Ambarihoz, a Spark2 szolgáltatás konfigurációjába közvetlenül. Ez a módszer fürtönként egyszeri tevékenység, amely Spark2-szolgáltatás újraindítását igényli.

    1. Nyissa meg az Ambarit, a Spark2 szolgáltatást, és válassza a konfigurációkat.

    2. Lépjen az egyéni spark2-defaults elemre, adjon hozzá egy új tulajdonságot a következővel, majd indítsa újra a Spark2 szolgáltatást:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Az érvényesítéshez használhatja cqlsh . További információ: Csatlakozás az Apache Cassandrához készült Azure Cosmos DB-hez a Sparkból.

Az Apache Cassandrához készült Azure Cosmos DB elérése a Spark Shellből

A Spark Shell teszteléshez és feltáráshoz használható.

  • Indítás spark-shell a fürt Spark-verziójával kompatibilis maven-függőségekkel.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • DDL- és DML-műveletek végrehajtása

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • CRUD-műveletek futtatása

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Az Apache Cassandra-hoz készült Azure Cosmos DB elérése Jupyter-notebookokból

HDInsight-Spark Zeppelin- és Jupyter notebook-szolgáltatásokkal rendelkezik. Mindkettő webalapú jegyzetfüzet-környezet, amely támogatja a Scalát és a Pythont. A jegyzetfüzetek kiválóan használhatók interaktív feltáró elemzésekhez és együttműködéshez, de nem üzemeltetési vagy éles folyamatokhoz.

A következő Jupyter-notebookok feltölthetők a HDInsight Spark-fürtbe, és kész mintákat biztosítanak az Apache Cassandra-hoz készült Azure Cosmos DB használatához. Mindenképpen tekintse át az első jegyzetfüzetet 1.0-ReadMe.ipynb , amely áttekinti a Spark szolgáltatás konfigurációját az Apache Cassandrához készült Azure Cosmos DB-hez való csatlakozáshoz.

Töltse le az azure-cosmos-db-cassandra-api-spark-notebooks-jupyter alatt található jegyzetfüzeteket a gépére.

Feltöltés

A Jupyter indításakor lépjen a Scala elemre. Hozzon létre egy könyvtárat, majd töltse fel a jegyzetfüzeteket a könyvtárba. A Feltöltés gomb a jobb felső sarokban található.

Futtatás

Haladjon végig a jegyzetfüzeteken és az egyes jegyzetfüzetcellákon egymás után. Az egyes jegyzetfüzetek tetején található Futtatás gombra kattintva az összes cellát futtathatja, azEnter shift+ billentyűt pedig minden cellához.

Hozzáférés az Apache Cassandra-hoz készült Azure Cosmos DB-vel a Spark Scala-programból

Az éles környezetben futó automatizált folyamatok esetében a Spark-programok a spark-submit használatával lesznek elküldve a fürtbe.

Következő lépések