從 Spark 更新插入 Azure Cosmos DB for Apache Cassandra
適用於: Cassandra
本文說明如何從 Spark 將資料更新插入 Azure Cosmos DB for Apache Cassandra。
API for Cassandra 設定
在您的 Notebook 叢集中設定下列 Spark 設定。 這是一次性的活動。
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
注意
如果您使用 Spark 3.x,則無須安裝 Azure Cosmos DB 協助程式和連線中心。 您也應使用 remoteConnectionsPerExecutor
,而不是 connections_per_executor_max
Spark 3 連接器 (如上述)。
警告
本文所示的 Spark 3 範例已使用 Spark 3.2.1 版和對應的 Cassandra Spark 連接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 進行測試。 較新版本的 Spark 和/或 Cassandra 連接器可能無法如預期般運作。
Dataframe API
建立資料框架
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// (1) Update: Changing author name to include prefix of "Sir"
// (2) Insert: adding a new book
val booksUpsertDF = Seq(
("b00001", "Sir Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Sir Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Sir Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Sir Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Sir Arthur Conan Doyle", "The hounds of Baskerville", 1901),
("b09999", "Sir Arthur Conan Doyle", "The return of Sherlock Holmes", 1905)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
booksUpsertDF.show()
Upsert 資料
// Upsert is no different from create
booksUpsertDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.save()
更新資料
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
//This runs on the driver, leverage only for one off updates
cdbConnector.withSessionDo(session => session.execute("update books_ks.books set book_price=99.33 where book_id ='b00300' and book_pub_year = 1901;"))
RDD API
注意
從 RDD API 更新插入等同於建立作業
下一步
繼續閱讀下列文章,以對 Azure Cosmos DB for Apache Cassandra 資料表中儲存的資料執行其他作業: