Tabellkopieringsåtgärder i Azure Cosmos DB för Apache Cassandra från Spark

GÄLLER FÖR: Kassandra

Den här artikeln beskriver hur du kopierar data mellan tabeller i Azure Cosmos DB för Apache Cassandra från Spark. Kommandona som beskrivs i den här artikeln kan också användas för att kopiera data från Apache Cassandra-tabeller till Azure Cosmos DB för Apache Cassandra-tabeller.

API för Cassandra-konfiguration

Ställ in spark-konfigurationen nedan i notebook-klustret. Det är en engångsaktivitet.

 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  


Om du använder Spark 3.x behöver du inte installera Azure Cosmos DB-hjälpen och anslutningsfabriken. Du bör också använda remoteConnectionsPerExecutor i stället connections_per_executor_max för för Spark 3-anslutningsappen (se ovan).


Spark 3-exemplen som visas i den här artikeln har testats med Spark version 3.2.1 och motsvarande Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Senare versioner av Spark och/eller Cassandra-anslutningsappen kanske inte fungerar som förväntat.

Infoga exempeldata

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry

val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))

Kopiera data mellan tabeller

Kopiera data mellan tabeller (måltabellen finns)

//1) Create destination table
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books_copy(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))

//2) Read from one table
val readBooksDF = sqlContext
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))

//3) Save to destination table
  .cassandraFormat("books_copy", "books_ks", "")

//4) Validate copy to destination table
  .options(Map( "table" -> "books_copy", "keyspace" -> "books_ks"))

Kopiera data mellan tabeller (måltabellen finns inte)

import com.datastax.spark.connector._

//1) Read from source table
val readBooksDF = sqlContext
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))

//2) Creates an empty table in the keyspace based off of source table
val newBooksDF = readBooksDF
    partitionKeyColumns = Some(Seq("book_id"))
    //clusteringKeyColumns = Some(Seq("some column"))

//3) Saves the data from the source table into the newly created table
  .cassandraFormat("books_new", "books_ks","")

//4) Validate table creation and data load
  .options(Map( "table" -> "books_new", "keyspace" -> "books_ks"))


|book_id|       book_author|           book_name|book_price|book_pub_year|
| b00300|Arthur Conan Doyle|The hounds of Bas...|     12.25|         1901|
| b00001|Arthur Conan Doyle|  A study in scarlet|     11.33|         1887|
| b00023|Arthur Conan Doyle|      A sign of four|     22.45|         1890|
| b00501|Arthur Conan Doyle|The memoirs of Sh...|     14.22|         1893|
| b01001|Arthur Conan Doyle|The adventures of...|     19.83|         1892|

import com.datastax.spark.connector._
readBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
newBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]

