Accéder à Azure Cosmos DB for Apache Cassandra à partir de Spark sur YARN avec HDInsight

S’APPLIQUE À : Cassandra

Cet article explique comment accéder à Azure Cosmos DB for Apache Cassandra à partir de Spark sur YARN avec HDInsight-Spark au moyen de spark-shell. HDInsight est la plateforme PaaS Hortonworks Hadoop de Microsoft sur Azure. Elle utilise le stockage d’objets pour HDFS et se décline en plusieurs versions, dont Spark. Bien que cet article fasse référence à HDInsight-Spark, il s’applique à toutes les distributions Hadoop.

Prérequis

Avant de commencer, passez en revue les principes fondamentaux de la connexion à Azure Cosmos DB for Apache Cassandra.

Vous devez respecter les prérequis suivants :

  • Approvisionnez Azure Cosmos DB for Apache Cassandra. Consultez Créer un compte de base de données.

  • Approvisionnez un cluster HDInsight-Spark. Consultez Créer un cluster Apache Spark dans Azure HDInsight à l’aide d’un modèle Resource Manager.

  • API pour la configuration de Cassandra dans Spark2. Le connecteur Spark pour Cassandra exige que les informations de la connexion Cassandra soient initialisées dans le contexte Spark. Lorsque vous lancez un notebook Jupyter, la session et le contexte spark sont déjà initialisés. N’arrêtez pas et ne réinitialisez pas le contexte Spark tant qu’il n’est pas doté de la configuration complète définie dans le cadre du démarrage du notebook Jupyter par défaut de HDInsight. Une solution de contournement consiste à ajouter directement les détails de l’instance Cassandra à la configuration du service Spark2 Ambari. Cette opération n’est nécessaire qu’une seule fois pour chacun des clusters qui requièrent un redémarrage du service Spark2.

    1. Accédez au service Spark2 Ambari, puis sélectionnez les configurations.

    2. Accédez aux valeurs spark2-defaults personnalisées et ajoutez une nouvelle propriété avec le code suivant, puis redémarrez le service Spark2 :

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Vous pouvez l’utiliser cqlsh pour la validation. Pour plus d’information, consultez Connexion à Azure Cosmos DB for Apache Cassandra à partir de Spark.

Accéder à Azure Cosmos DB for Apache Cassandra à partir de l’interpréteur de commandes Spark

L’interpréteur de commandes Spark est utilisé à des fins de test et d’exploration.

  • Lancez spark-shell avec les dépendances maven requises compatibles avec la version Spark de votre cluster.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Exécutez certaines opérations DDL et DML.

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Exécutez des opérations CRUD.

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Accéder à Azure Cosmos DB for Apache Cassandra à partir de notebooks Jupyter

HDInsight-Spark est fourni avec les services de bloc-notes Zeppelin et Jupyter. Ce sont deux environnements de notebook basés sur le web qui prennent en charge Scala et Python. Les notebooks sont parfaits pour l’analytique exploratoire interactive et la collaboration, mais ne sont pas conçus pour les processus opérationnels ou de production.

Les notebooks Jupyter suivants peuvent être chargés dans votre cluster HDInsight Spark et fournissent des exemples d’utilisation d’Azure Cosmos DB for Apache Cassandra immédiatement exploitables. Veillez à consulter le premier notebook 1.0-ReadMe.ipynb, qui contient la configuration du service Spark requise pour la connexion à Azure Cosmos DB for Apache Cassandra.

Téléchargez les notebooks disponibles sous azure-cosmos-db-cassandra-api-spark-notebooks-jupyter sur votre machine.

Procédure à suivre pour le chargement

Quand vous lancez Jupyter, accédez à Scala. Créez un répertoire, puis chargez-y les notebooks. Le bouton Charger se trouve en haut à droite.

Procédure à suivre pour l’exécution

Parcourez les notebooks et chaque cellule de ceux-ci séquentiellement. Sélectionnez le bouton Exécuter en haut de chaque notebook pour exécuter toutes les cellules, ou appuyez sur Maj+Entrée pour chaque cellule.

Accéder à Azure Cosmos DB for Apache Cassandra à partir de votre programme Scala Spark

Pour les processus automatisés dans l’environnement de production, les programmes Spark sont envoyés au cluster au moyen du script spark-submit.

Étapes suivantes