Condividi tramite


Accedere ad Azure Cosmos DB for Apache Cassandra da Spark in YARN con HDInsight

SI APPLICA A: Cassandra

Questo articolo illustra come accedere ad Azure Cosmos DB for Apache Cassandra da Spark in YARN con HDInsight Spark da spark-shell. HDInsight è la soluzione PaaS Hortonworks Hadoop di Microsoft in Azure. Usa l'archiviazione di oggetti per Hadoop Distributed File System e include diverse versioni, tra cui Spark. Anche se questo articolo fa riferimento a HDInsight Spark, si applica a tutte le distribuzioni Hadoop.

Prerequisiti

Prima di iniziare, esaminare le nozioni di base sulla connessione ad Azure Cosmos DB for Apache Cassandra.

È necessario soddisfare i prerequisiti seguenti:

  • Effettuare il provisioning di Azure Cosmos DB for Apache Cassandra. Vedere Creare un account di database.

  • Effettuare il provisioning di un cluster HDInsight Spark. Vedere Creare un cluster Apache Spark in Azure HDInsight usando un modello di ARM.

  • Configurazione dell'API for Cassandra in Spark2. Il connettore Spark per Cassandra richiede che i dettagli della connessione a Cassandra vengano inizializzati come parte del contesto Spark. Quando si avvia un notebook Jupyter, la sessione Spark e il contesto sono già inizializzati. Non arrestare e reinizializzare il contesto Spark a meno che non sia completo di ogni set di configurazione come parte dell'avvio predefinito del notebook Jupyter di HDInsight. Una soluzione alternativa consiste nell'aggiungere i dettagli dell'istanza di Cassandra direttamente alla configurazione del servizio Ambari Spark2. Si tratta di un'attività occasionale per ogni cluster che richiede un riavvio del servizio Spark2.

    1. Passare al servizio Ambari Spark2 e selezionare le configurazioni.

    2. Passare alle impostazione predefinite di Spark2 personalizzate, aggiungere una nuova proprietà con il codice seguente e riavviare il servizio Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

È possibile usare cqlsh per la convalida. Per altre informazioni, vedere Connessione ad Azure Cosmos DB for Apache Cassandra da Spark.

Accedere ad Azure Cosmos DB for Apache Cassandra dalla shell di Spark

La shell di Spark viene usata per il test e l'esplorazione.

  • Avviare spark-shell con le dipendenze di Maven necessarie compatibili con la versione di Spark del cluster.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Eseguire alcune operazioni DDL e DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Eseguire operazioni CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Accedere ad Azure Cosmos DB for Apache Cassandra dai notebook Jupyter

HDInsight Spark include i servizi Zeppelin e Jupyter Notebook. Sono entrambi ambienti notebook basati sul Web che supportano Scala e Python. I notebook sono ideali per le analisi esplorative interattive e la collaborazione, ma non sono destinati a processi operativi o di produzione.

I notebook Jupyter seguenti possono essere caricati nel cluster HDInsight Spark e forniscono esempi pronti per l'uso con Azure Cosmos DB for Apache Cassandra. Assicurarsi di esaminare il file 1.0-ReadMe.ipynb del primo notebook per verificare la configurazione del servizio Spark per la connessione ad Azure Cosmos DB for Apache Cassandra.

Scaricare i notebook da azure-cosmos-db-cassandra-api-spark-notebooks-jupyter nel computer.

Modalità di caricamento

Quando si avvia Jupyter, passare a Scala. Creare una directory e quindi caricare i notebook nella directory. Il pulsante Carica si trova in alto a destra.

Modalità di esecuzione

Esaminare i notebook e ogni cella dei notebook in sequenza. Fare clic sul pulsante Esegui nella parte superiore di ogni notebook per eseguire tutte le celle o premere MAIUSC+INVIO per eseguire le singole celle.

Accedere con Azure Cosmos DB for Apache Cassandra dal programma Spark Scala

Per i processi di produzione automatizzati, i programmi Spark vengono inviati al cluster tramite spark-submit.

Passaggi successivi