Partilhar via


Criar/inserir dados no Azure Cosmos DB para Apache Cassandra a partir do Spark

APLICA-SE A: Cassandra

Este artigo descreve como inserir dados de exemplo em uma tabela no Azure Cosmos DB para Apache Cassandra do Spark.

Configuração da API para Cassandra

Defina abaixo a configuração de faísca no cluster de blocos de anotações. É uma atividade única.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Nota

Se você estiver usando o Spark 3.x, não precisará instalar o auxiliar e a fábrica de conexões do Azure Cosmos DB. Você também deve usar remoteConnectionsPerExecutor em vez do connections_per_executor_max conector Spark 3 (veja acima).

Aviso

Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e o correspondente Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Versões posteriores do Spark e/ou do conector Cassandra podem não funcionar como esperado.

API DataFrame

Criar um Dataframe com dados de exemplo

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Nota

A funcionalidade "Criar se não existir", em nível de linha, ainda não é suportada.

Persistir no Azure Cosmos DB para Apache Cassandra

Ao salvar dados, você também pode definir configurações de política de tempo de vida útil e consistência, conforme mostrado no exemplo a seguir:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Validar em cqlsh

use books_ks;
select * from books;

API de banco de dados distribuído resiliente (RDD)

Criar um RDD com dados de exemplo

//Drop and re-create table to delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Nota

A funcionalidade Criar se não existir ainda não é suportada.

Persistir no Azure Cosmos DB para Apache Cassandra

Ao salvar dados na API para Cassandra, você também pode definir configurações de política de tempo de vida útil e consistência, conforme mostrado no exemplo a seguir:

import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

Validar em cqlsh

use books_ks;
select * from books;

Próximos passos

Depois de inserir dados na tabela do Azure Cosmos DB para Apache Cassandra, prossiga para os seguintes artigos para executar outras operações nos dados armazenados no Azure Cosmos DB para Apache Cassandra: