從 Spark 在 Azure Cosmos DB for Apache Cassandra 上執行資料表複製作業
適用於: Cassandra
此文章說明如何從 Spark 在 Azure Cosmos DB for Apache Cassandra 中的資料表之間複製資料。 此文章中所述的命令也可用來將資料從 Apache Cassandra 資料表複製到 Azure Cosmos DB for Apache Cassandra 資料表。
API for Cassandra 設定
在您的 Notebook 叢集中設定下列 Spark 設定。 這是一次性的活動。
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
注意
如果您使用 Spark 3.x,則無須安裝 Azure Cosmos DB 協助程式和連線中心。 您也應使用 remoteConnectionsPerExecutor
,而不是 connections_per_executor_max
Spark 3 連接器 (如上述)。
警告
本文所示的 Spark 3 範例已使用 Spark 3.2.1 版和對應的 Cassandra Spark 連接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 進行測試。 較新版本的 Spark 和/或 Cassandra 連接器可能無法如預期般運作。
插入範例資料
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
在資料表之間複製資料
在資料表之間複製資料 (目的地資料表存在)
//1) Create destination table
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books_copy(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
//2) Read from one table
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
//3) Save to destination table
readBooksDF.write
.cassandraFormat("books_copy", "books_ks", "")
.save()
//4) Validate copy to destination table
sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books_copy", "keyspace" -> "books_ks"))
.load
.show
在資料表之間複製資料 (目的地資料表不存在)
import com.datastax.spark.connector._
//1) Read from source table
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
//2) Creates an empty table in the keyspace based off of source table
val newBooksDF = readBooksDF
newBooksDF.createCassandraTable(
"books_ks",
"books_new",
partitionKeyColumns = Some(Seq("book_id"))
//clusteringKeyColumns = Some(Seq("some column"))
)
//3) Saves the data from the source table into the newly created table
newBooksDF.write
.cassandraFormat("books_new", "books_ks","")
.mode(SaveMode.Append)
.save()
//4) Validate table creation and data load
sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books_new", "keyspace" -> "books_ks"))
.load
.show
輸出如下:
+-------+------------------+--------------------+----------+-------------+
|book_id| book_author| book_name|book_price|book_pub_year|
+-------+------------------+--------------------+----------+-------------+
| b00300|Arthur Conan Doyle|The hounds of Bas...| 12.25| 1901|
| b00001|Arthur Conan Doyle| A study in scarlet| 11.33| 1887|
| b00023|Arthur Conan Doyle| A sign of four| 22.45| 1890|
| b00501|Arthur Conan Doyle|The memoirs of Sh...| 14.22| 1893|
| b01001|Arthur Conan Doyle|The adventures of...| 19.83| 1892|
+-------+------------------+--------------------+----------+-------------+
import com.datastax.spark.connector._
readBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
newBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
下一步
- 開始使用 Java 應用程式來建立 API for Cassandra 帳戶、資料庫及資料表。
- 使用 Java 應用程式將範例資料載入 API for Cassandra 資料表。
- 使用 Java 應用程式從 API for Cassandra 帳戶查詢資料。