Az Apache Cassandrához készült Azure Cosmos DB elérése a Spark on YARN-ból a HDInsighttal
A KÖVETKEZŐKRE VONATKOZIK: Cassandra
Ez a cikk bemutatja, hogyan érheti el az Apache Cassandra-hoz készült Azure Cosmos DB-t a Spark on YARN-ból a HDInsight-Spark a-ból spark-shell
. A HDInsight a Microsoft Hortonworks Hadoop PaaS szolgáltatása az Azure-ban. Objektumtárolót használ a HDFS-hez, és számos változatban kapható, beleértve a Sparkot is. Bár ez a cikk a HDInsight-Sparkra vonatkozik, az összes Hadoop-disztribúcióra vonatkozik.
Előfeltételek
Mielőtt hozzákezdene, tekintse át az Apache Cassandrához készült Azure Cosmos DB-hez való csatlakozás alapjait.
A következő előfeltételekre van szüksége:
Az Azure Cosmos DB kiépítése az Apache Cassandra számára. Lásd: Adatbázisfiók létrehozása.
HDInsight-Spark-fürt kiépítése. Lásd: Apache Spark-fürt létrehozása az Azure HDInsightban ARM-sablonnal.
A Cassandra konfigurációs API-ja a Spark2-ben. A Cassandra Spark-összekötőjének inicializálnia kell a Cassandra-kapcsolat részleteit a Spark-környezet részeként. Jupyter-notebook indításakor a Spark-munkamenet és a környezet már inicializálva lesz. Ne állítsa le és inicializálja újra a Spark-környezetet, kivéve, ha minden konfiguráció be van állítva a HDInsight alapértelmezett Jupyter-notebook indítási folyamatának részeként. Az egyik megkerülő megoldás a Cassandra-példány részleteinek hozzáadása az Ambarihoz, a Spark2 szolgáltatás konfigurációjába közvetlenül. Ez a módszer fürtönként egyszeri tevékenység, amely Spark2-szolgáltatás újraindítását igényli.
Nyissa meg az Ambarit, a Spark2 szolgáltatást, és válassza a konfigurációkat.
Lépjen az egyéni spark2-defaults elemre, adjon hozzá egy új tulajdonságot a következővel, majd indítsa újra a Spark2 szolgáltatást:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Az érvényesítéshez használhatja cqlsh
. További információ: Csatlakozás az Apache Cassandrához készült Azure Cosmos DB-hez a Sparkból.
Az Apache Cassandrához készült Azure Cosmos DB elérése a Spark Shellből
A Spark Shell teszteléshez és feltáráshoz használható.
Indítás
spark-shell
a fürt Spark-verziójával kompatibilis maven-függőségekkel.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
DDL- és DML-műveletek végrehajtása
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
CRUD-műveletek futtatása
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Az Apache Cassandra-hoz készült Azure Cosmos DB elérése Jupyter-notebookokból
HDInsight-Spark Zeppelin- és Jupyter notebook-szolgáltatásokkal rendelkezik. Mindkettő webalapú jegyzetfüzet-környezet, amely támogatja a Scalát és a Pythont. A jegyzetfüzetek kiválóan használhatók interaktív feltáró elemzésekhez és együttműködéshez, de nem üzemeltetési vagy éles folyamatokhoz.
A következő Jupyter-notebookok feltölthetők a HDInsight Spark-fürtbe, és kész mintákat biztosítanak az Apache Cassandra-hoz készült Azure Cosmos DB használatához. Mindenképpen tekintse át az első jegyzetfüzetet 1.0-ReadMe.ipynb
, amely áttekinti a Spark szolgáltatás konfigurációját az Apache Cassandrához készült Azure Cosmos DB-hez való csatlakozáshoz.
Töltse le az azure-cosmos-db-cassandra-api-spark-notebooks-jupyter alatt található jegyzetfüzeteket a gépére.
Feltöltés
A Jupyter indításakor lépjen a Scala elemre. Hozzon létre egy könyvtárat, majd töltse fel a jegyzetfüzeteket a könyvtárba. A Feltöltés gomb a jobb felső sarokban található.
Futtatás
Haladjon végig a jegyzetfüzeteken és az egyes jegyzetfüzetcellákon egymás után. Az egyes jegyzetfüzetek tetején található Futtatás gombra kattintva az összes cellát futtathatja, azEnter shift+ billentyűt pedig minden cellához.
Hozzáférés az Apache Cassandra-hoz készült Azure Cosmos DB-vel a Spark Scala-programból
Az éles környezetben futó automatizált folyamatok esetében a Spark-programok a spark-submit használatával lesznek elküldve a fürtbe.