從 Spark 在 Azure Cosmos DB for Apache Cassandra 中執行的 DDL 作業
適用於: Cassandra
本文詳細說明如何從 Spark 針對 Azure Cosmos DB for Apache Cassandra 執行 Keyspace 和資料表 DDL 作業。
Spark 內容
適用於 API for Cassandra 的連接器需要將 Cassandra 連線詳細資料初始化,以作為 Spark 內容的一部分。 當您啟動筆記本時,Spark 內容已初始化,因此建議您不要將其停止並重新初始化。 有一個解決方案是在叢集 Spark 設定中的叢集層級新增 API for Cassandra 執行個體設定。 這是每個叢集的一次性活動。 將下列程式碼新增至 Spark 設定,作為以空格分隔的索引鍵值組:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
API for Cassandra 相關的設定
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
注意
如果您使用 Spark 3.x,則無須安裝 Azure Cosmos DB 協助程式和連線中心。 您也應使用 remoteConnectionsPerExecutor
,而不是 connections_per_executor_max
Spark 3 連接器 (如上述)。
警告
本文所示的 Spark 3 範例已使用 Spark 3.2.1 版和對應的 Cassandra Spark 連接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 進行測試。 較新版本的 Spark 和/或 Cassandra 連接器可能無法如預期般運作。
Keyspace DDL 作業
建立 Keyspace
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
在 cqlsh 中驗證
在 cqlsh 中執行下列命令,您應該會看到您稍早建立的 Keyspace。
DESCRIBE keyspaces;
卸除 Keyspace
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
在 cqlsh 中驗證
DESCRIBE keyspaces;
資料表 DDL 作業
考量因素:
- 使用 create table 陳述式,即可在資料表層級指派輸送量。
- 一個分割區索引鍵可以儲存 20 GB 的資料。
- 一筆記錄最多可以儲存 2 MB 的資料。
- 一個分割區索引鍵範圍可以儲存多個分割區索引鍵。
建立表格
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
在 cqlsh 中驗證
在 cqlsh 中執行下列命令,您應該會看到名為 "books" 的資料表:
USE books_ks;
DESCRIBE books;
佈建的輸送量和預設 TTL 值不會顯示在前一個命令的輸出中,您可以從入口網站取得這些值。
改變資料表
您可以使用 alter table 命令來改變下列值:
- 佈建輸送量
- 存留時間值
目前不支援資料行變更。
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
卸除資料表
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
在 cqlsh 中驗證
在 cqlsh 中執行下列命令,您應該會看到 "books" 資料表再也無法使用:
USE books_ks;
DESCRIBE tables;
下一步
建立 Keyspace 和資料表之後,請繼續閱讀下列文章以了解 CRUD 作業和其他功能: