Toegang tot Azure Cosmos DB voor Apache Cassandra vanuit Spark op YARN met HDInsight

VAN TOEPASSING OP: Cassandra

In dit artikel wordt beschreven hoe u toegang hebt tot Azure Cosmos DB voor Apache Cassandra vanuit Spark op YARN met HDInsight-Spark vanuit spark-shell. HDInsight is Hortonworks Hadoop PaaS van Microsoft in Azure. Het maakt gebruik van objectopslag voor HDFS en is beschikbaar in verschillende varianten, waaronder Spark. Hoewel dit artikel verwijst naar HDInsight-Spark, is het van toepassing op alle Hadoop-distributies.

Vereisten

Voordat u begint, bekijkt u de basisbeginselen van het maken van verbinding met Azure Cosmos DB voor Apache Cassandra.

De volgende vereisten zijn nodig:

  • Azure Cosmos DB inrichten voor Apache Cassandra. Zie Een databaseaccount maken.

  • Een HDInsight-Spark cluster inrichten. Zie Een Apache Spark-cluster maken in Azure HDInsight met behulp van een ARM-sjabloon.

  • API voor Cassandra-configuratie in Spark2. De Spark-connector voor Cassandra vereist dat de Cassandra-verbindingsgegevens worden geïnitialiseerd als onderdeel van de Spark-context. Wanneer u een Jupyter-notebook start, zijn de Spark-sessie en -context al geïnitialiseerd. Stop niet en initialiseer de Spark-context niet, tenzij deze is voltooid met elke configuratie die is ingesteld als onderdeel van de standaardstart van het Jupyter-notebook van HDInsight. Een tijdelijke oplossing is om de details van het Cassandra-exemplaar rechtstreeks toe te voegen aan Ambari, spark2-serviceconfiguratie. Deze benadering is een eenmalige activiteit per cluster waarvoor een Spark2-service opnieuw moet worden opgestart.

    1. Ga naar Ambari, Spark2-service en selecteer configuraties.

    2. Ga naar aangepaste spark2-defaults en voeg een nieuwe eigenschap toe met het volgende en start de Spark2-service opnieuw:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

U kunt gebruiken cqlsh voor validatie. Zie Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark voor meer informatie.

Toegang tot Azure Cosmos DB voor Apache Cassandra vanuit Spark-shell

Spark Shell wordt gebruikt voor testen en verkennen.

  • Start spark-shell met de vereiste Maven-afhankelijkheden die compatibel zijn met de Spark-versie van uw cluster.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Enkele DDL- en DML-bewerkingen uitvoeren

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • CRUD-bewerkingen uitvoeren

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Toegang tot Azure Cosmos DB voor Apache Cassandra vanuit Jupyter-notebooks

HDInsight-Spark wordt geleverd met zeppelin- en Jupyter-notebookservices. Het zijn beide webgebaseerde notebookomgevingen die ondersteuning bieden voor Scala en Python. Notebooks zijn ideaal voor interactieve verkennende analyses en samenwerking, maar niet bedoeld voor operationele of productieprocessen.

De volgende Jupyter-notebooks kunnen worden geüpload naar uw HDInsight Spark-cluster en bieden kant-en-klare voorbeelden voor het werken met Azure Cosmos DB voor Apache Cassandra. Lees het eerste notebook 1.0-ReadMe.ipynb om de Configuratie van de Spark-service te controleren voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra.

Download de notebooks onder azure-cosmos-db-cassandra-api-spark-notebooks-jupyter naar uw computer.

Uploaden

Wanneer u Jupyter start, gaat u naar Scala. Maak een map en upload de notebooks vervolgens naar de map. De knop Uploaden bevindt zich rechtsboven.

Uitvoeren

Doorloop de notitieblokken en elke notebookcel sequentieel. Selecteer de knop Uitvoeren boven aan elk notitieblok om alle cellen uit te voeren of Shift+Enter voor elke cel.

Toegang met Azure Cosmos DB voor Apache Cassandra vanuit uw Spark Scala-programma

Voor geautomatiseerde processen in productie worden Spark-programma's naar het cluster verzonden met behulp van spark-submit.

Volgende stappen