Partilhar via


Acesse o Azure Cosmos DB para Apache Cassandra do Spark no YARN com o HDInsight

APLICA-SE A: Cassandra

Este artigo aborda como acessar o Azure Cosmos DB para Apache Cassandra do Spark no YARN com o HDInsight-Spark do spark-shell. O HDInsight é o Hortonworks Hadoop PaaS da Microsoft no Azure. Ele usa armazenamento de objetos para HDFS e vem em vários sabores, incluindo Spark. Embora este artigo se refira ao HDInsight-Spark, ele se aplica a todas as distribuições Hadoop.

Pré-requisitos

Antes de começar, revise as noções básicas de conexão com o Azure Cosmos DB para Apache Cassandra.

Você precisa dos seguintes pré-requisitos:

  • Provisione o Azure Cosmos DB para Apache Cassandra. Consulte Criar uma conta de banco de dados.

  • Provisione um cluster HDInsight-Spark. Consulte Criar cluster Apache Spark no Azure HDInsight usando o modelo ARM.

  • API para configuração Cassandra no Spark2. O conector Spark para Cassandra requer que os detalhes da conexão Cassandra sejam inicializados como parte do contexto Spark. Quando você inicia um bloco de anotações Jupyter, a sessão de faísca e o contexto já são inicializados. Não pare e reinicialize o contexto do Spark, a menos que ele esteja completo com todas as configurações definidas como parte da inicialização padrão do notebook Jupyter do HDInsight. Uma solução alternativa é adicionar os detalhes da instância Cassandra à configuração do serviço Ambari, Spark2, diretamente. Essa abordagem é uma atividade única por cluster que requer uma reinicialização do serviço Spark2.

    1. Vá para Ambari, serviço Spark2 e selecione configurações.

    2. Vá para spark2-defaults personalizados, adicione uma nova propriedade com o seguinte e reinicie o serviço Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Você pode usar cqlsh para validação. Para obter mais informações, consulte Conectando-se ao Azure Cosmos DB para Apache Cassandra a partir do Spark.

Acessar o Azure Cosmos DB para Apache Cassandra a partir do shell do Spark

A concha de faísca é usada para testes e exploração.

  • Inicie spark-shell com as dependências maven necessárias compatíveis com a versão Spark do cluster.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Executar algumas operações DDL e DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Executar operações CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Acessar o Azure Cosmos DB para Apache Cassandra a partir de notebooks Jupyter

O HDInsight-Spark vem com os serviços de notebook Zeppelin e Jupyter. Ambos são ambientes de notebook baseados na Web que suportam Scala e Python. Os notebooks são ótimos para análise exploratória interativa e colaboração, mas não se destinam a processos operacionais ou de produção.

Os seguintes blocos de anotações Jupyter podem ser carregados em seu cluster HDInsight Spark e fornecer exemplos prontos para trabalhar com o Azure Cosmos DB para Apache Cassandra. Certifique-se de revisar o primeiro bloco de anotações 1.0-ReadMe.ipynb para revisar a configuração do serviço Spark para se conectar ao Azure Cosmos DB para Apache Cassandra.

Transfira os blocos de notas em azure-cosmos-db-cassandra-api-spark-notebooks-jupyter para a sua máquina.

Como carregar

Ao iniciar o Jupyter, navegue até Scala. Crie um diretório e, em seguida, carregue os blocos de anotações para o diretório. O botão Carregar está na parte superior, do lado direito.

Como executar

Percorra os blocos de notas e cada célula do bloco de notas sequencialmente. Selecione o botão Executar na parte superior de cada bloco de notas para executar todas as células ou Shift+Enter para cada célula.

Acesso com o Azure Cosmos DB para Apache Cassandra a partir do seu programa Spark Scala

Para processos automatizados em produção, os programas Spark são enviados ao cluster usando spark-submit.

Próximos passos