Toegang tot Azure Cosmos DB voor Apache Cassandra vanuit Spark op YARN met HDInsight
VAN TOEPASSING OP: Cassandra
In dit artikel wordt beschreven hoe u toegang krijgen tot Azure Cosmos DB voor Apache Cassandra vanuit Spark op YARN met HDInsight-Spark van spark-shell
. HDInsight is Hortonworks Hadoop PaaS van Microsoft in Azure. Het maakt gebruik van objectopslag voor HDFS en wordt geleverd in verschillende smaken, waaronder Spark. Hoewel dit artikel verwijst naar HDInsight-Spark, is dit van toepassing op alle Hadoop-distributies.
Vereisten
Bekijk voordat u begint de basisbeginselen van het maken van verbinding met Azure Cosmos DB voor Apache Cassandra.
De volgende vereisten zijn nodig:
Azure Cosmos DB inrichten voor Apache Cassandra. Zie Een databaseaccount maken.
Richt een HDInsight-Spark-cluster in. Zie Een Apache Spark-cluster maken in Azure HDInsight met behulp van een ARM-sjabloon.
API voor Cassandra-configuratie in Spark2. De Spark-connector voor Cassandra vereist dat de Cassandra-verbindingsgegevens worden geïnitialiseerd als onderdeel van de Spark-context. Wanneer u een Jupyter-notebook start, worden de Spark-sessie en -context al geïnitialiseerd. Stop en initialiseer de Spark-context niet, tenzij deze is voltooid met elke configuratieset als onderdeel van de standaardstart van het Jupyter-notebook in HDInsight. Een tijdelijke oplossing is om de details van het Cassandra-exemplaar rechtstreeks toe te voegen aan ambari, spark2-serviceconfiguratie. Deze benadering is een eenmalige activiteit per cluster waarvoor een Spark2-service opnieuw moet worden opgestart.
Ga naar ambari, Spark2-service en selecteer configuraties.
Ga naar aangepaste spark2-standaardwaarden en voeg een nieuwe eigenschap toe met het volgende en start de Spark2-service opnieuw op:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
U kunt deze gebruiken cqlsh
voor validatie. Zie Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark voor meer informatie.
Toegang tot Azure Cosmos DB voor Apache Cassandra vanuit Spark Shell
Spark-shell wordt gebruikt voor testen en verkennen.
Start
spark-shell
met de vereiste maven-afhankelijkheden die compatibel zijn met de Spark-versie van uw cluster.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Enkele DDL- en DML-bewerkingen uitvoeren
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
CRUD-bewerkingen uitvoeren
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Toegang tot Azure Cosmos DB voor Apache Cassandra vanuit Jupyter-notebooks
HDInsight-Spark wordt geleverd met Zeppelin- en Jupyter-notebookservices. Ze zijn zowel webgebaseerde notebookomgevingen die Ondersteuning bieden voor Scala en Python. Notebooks zijn ideaal voor interactieve verkennende analyses en samenwerking, maar niet bedoeld voor operationele of productieprocessen.
De volgende Jupyter-notebooks kunnen worden geüpload naar uw HDInsight Spark-cluster en bieden kant-en-klare voorbeelden voor het werken met Azure Cosmos DB voor Apache Cassandra. Controleer het eerste notebook 1.0-ReadMe.ipynb
om de configuratie van de Spark-service te controleren voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra.
Download de notebooks onder azure-cosmos-db-cassandra-api-spark-notebooks-jupyter naar uw computer.
Uploaden
Wanneer u Jupyter start, gaat u naar Scala. Maak een map en upload de notebooks vervolgens naar de map. De knop Upload bevindt zich rechtsboven.
Uitvoeren
Doorloop de notebooks en elke notebookcel opeenvolgend. Selecteer de knop Uitvoeren boven aan elk notitieblok om alle cellen uit te voeren of Shift+Enter voor elke cel.
Toegang met Azure Cosmos DB voor Apache Cassandra vanuit uw Spark Scala-programma
Voor geautomatiseerde processen in productie worden Spark-programma's verzonden naar het cluster met behulp van spark-submit.