從 Spark 連線至 Azure Cosmos DB for Apache Cassandra
適用於: Cassandra
此文章是從 Spark 進行 Azure Cosmos DB for Apache Cassandra 整合的系列文章之一。 這些文章涵蓋連線能力、資料定義語言 (DDL) 作業、基本的資料操作語言 (DML) 作業,以及從 Spark 進行進階 Azure Cosmos DB for Apache Cassandra 整合。
必要條件
佈建您選擇的 Spark 環境 [Azure Databricks | Azure HDInsight-Spark | 其他]。
連線能力的相依項目
適用於 Cassandra 的 Spark 連接器:Spark 連接器用來連線至 Azure Cosmos DB for Apache Cassandra。 請找出位於 Maven 中心且與 Spark 環境的 Spark 和 Scala 版本相容的連接器版本,並加以使用。 建議使用支援 Spark 3.2.1 或更高版本的環境,並在 maven 座標
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
提供 Apark 連接器。 如果使用 Spark 2.x,建議您採用 Spark 2.4.5 版本的環境,使用 Maven 座標com.datastax.spark:spark-cassandra-connector_2.11:2.4.3
中的 Spark 連接器。適用於 API for Cassandra 的 Azure Cosmos DB 協助程式程式庫:如果您使用 Spark 2.x 版本,則除了 Spark 連接器以外,您還需要另一個名為 azure-cosmos-cassandra-spark-helper 的程式庫 (具有來自 Azure Cosmos DB 的 Maven 座標
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
),才能處理速率限制。 此程式庫包含自訂連線處理站和重試原則類別。Azure Cosmos DB 中的重試原則設定為處理 HTTP 狀態碼 429 (「要求速率很大」) 的例外狀況。 Azure Cosmos DB for Apache Cassandra 會將這些例外狀況轉譯成 Cassandra 原生通訊協定上的多載錯誤,而您可以使用輪詢進行重試。 由於 Azure Cosmos DB 會使用佈建的輸送量模型,因此輸入/輸出速率增加時,也會發生要求速率限制例外狀況。 重試原則可保護您的 Spark 工作以免遭受資料暴增的影響,而資料暴增會暫時超出容器集合的輸送量。 如果使用 Spark 3.x 連接器,則不需要執行此程式庫。
注意
重試原則只能保護您的 Spark 工作以免遭受暫時暴增的影響。 如果您未設定執行工作負載所需的足夠 RU,則重試原則不適用,而且重試原則類別會重新擲回例外狀況。
Azure Cosmos DB 帳戶連線詳細資料:您的 Azure API for Cassandra 帳戶名稱、帳戶端點和金鑰。
Spark 連接器輸送量設定最佳化
下一節所列的所有相關參數,都是使用 Spark Connector for Cassandra 控制輸送量的相關參數。 為了將參數最佳化以最大化 spark 作業的輸送量,必須正確設定 spark.cassandra.output.concurrent.writes
、spark.cassandra.concurrent.reads
和 spark.cassandra.input.reads_per_sec
,以避免太多節流和輪詢 (進而導致輸送量降低)。
這些設定的最佳值取決於四個因素:
- 針對要內嵌資料的資料表設定的輸送量 (要求單位數量)。
- Spark 叢集中的背景工作數目。
- 針對您的 Spark 作業設定的執行程式數目 (可以使用
spark.cassandra.connection.connections_per_executor_max
或spark.cassandra.connection.remoteConnectionsPerExecutor
根據 Spark 版本來控制) - 如果在相同的資料中心內共置,則為每個要求對 Azure Cosmos DB 的平均延遲。 假設此值為 10 毫秒 (寫入) 和 3 毫秒 (讀取)。
例如,如果我們有五個背景工作,且值 spark.cassandra.output.concurrent.writes
為 1,且值 spark.cassandra.connection.remoteConnectionsPerExecutor
為 1,則會有五個背景工作角色同時寫入資料表,每個背景工作都有一個執行緒。 如果執行單一寫入需要 10 毫秒的時間,則每秒每執行緒可以傳送 100 個要求 (1000 毫秒除以10)。 有五個背景工作的話,會是每秒 500 次寫入。 針對每次寫入五個要求單位 (RU) 的平均成本,目標資料表最少需要佈建 2500 個要求單位,(每秒 5 個 RU x 500 次寫入)。
增加執行程式的數目可能會增加指定作業中的執行緒數目,進而增加輸送量。 不過,視工作而定,其確切的影響可能會變動,而使用背景工作數目來控制輸送量則更具決定性。 您也可以藉由分析來判斷指定要求的確切成本,以取得要求單位 (RU) 收費。 這可協助您在佈建資料表或 keyspace 的輸送量時更準確。 請參閱本文,了解如何在每個要求層級取得要求單位費用。
調整資料庫中的輸送量
Cassandra Spark 連接器會有效率地達到 Azure Cosmos DB 中的最大輸送量。 因此,即使有有效的重試,您也必須確定有足夠的輸送量 (RU) 佈建於資料表或 keyspace 層級,以避免速率限制相關的錯誤。 在指定的資料表或 keyspace 中,400 RU 的最小設定將不足夠。 即使是最小輸送量設定,Spark 連接器也可以用相對於 6000 要求單位 或更多的速率來寫入。
如果使用 Spark 來移動資料所需的 RU 設定高於穩定狀態工作負載所需的設定,您可以在 Azure Cosmos DB 中,以有系統的方式,輕鬆地相應增加和減少輸送量,以符合指定時間週期的工作負載需求。 若要了解如何以程式設計和動態方式進行調整的各種選項,請閱讀我們有關 API for Cassandra 中的彈性調整的文章。
注意
上述指導方針假設有相當一致的資料散發。 如果您的資料有明顯的扭曲 (也就是非常大量的讀取/寫入相同的資料分割索引鍵值),則即使您在資料表中佈建了大量的要求單位,仍可能會遇到瓶頸。 要求單位會在實體分割區中平均分配,而大量的資料扭曲可能會造成單一分割區的要求瓶頸。
Spark 連接器輸送量設定參數
下表列出連接器所提供的 Azure Cosmos DB for Apache Cassandra 特定輸送量設定參數。 如需所有設定參數的詳細清單,請參閱 Spark Cassandra 連接器 GitHub 存放庫的設定參考頁面。
屬性名稱 | 預設值 | 說明 |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | 每個單一批次的資料列數目。 將此參數設定為 1。 此參數用來為繁重的工作負載達到更高的輸送量。 |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | 無 | 每個執行程式的每個節點連線數目上限。 10*n 相當於 n 個節點的 Cassandra 叢集中每個節點有 10 個連線。 因此,如果針對五個節點的 Cassandra 叢集,您需要每個執行程式的每個節點有五個連線,則您應該將此設定設為 25。 請根據 Spark 工作設定的平行處理原則程度或執行程式數目來修改此值。 |
spark.cassandra.output.concurrent.writes | 100 | 定義每個執行程式可能發生的平行寫入數目。 因為您將 "batch.size.rows" 設定為 1,所以請務必據以擴大此值。 請根據您想要針對工作負載達到的平行處理原則程度或輸送量來修改此值。 |
spark.cassandra.concurrent.reads | 512 | 定義每個執行程式可能發生的平行讀取數目。 請根據您想要針對工作負載達到的平行處理原則程度或輸送量來修改此值 |
spark.cassandra.output.throughput_mb_per_sec | 無 | 定義每個執行程式的總寫入輸送量。 此參數可用來作為 Spark 作業輸送量的上限,並以 Azure Cosmos DB 容器的佈建輸送量為基礎。 |
spark.cassandra.input.reads_per_sec | 無 | 定義每個執行程式的總讀取輸送量。 此參數可用來作為 Spark 作業輸送量的上限,並以 Azure Cosmos DB 容器的佈建輸送量為基礎。 |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | 定義每個單一 Spark 工作可在傳送至 API for Cassandra 之前儲存在記憶體中的批次數目 |
spark.cassandra.connection.keep_alive_ms | 60000 | 定義未使用的連線可供使用的期間。 |
根據您對 Spark 作業所預期的工作負載及您已為 Azure Cosmos DB 帳戶佈建的輸送量,調整這些參數的輸送量和平行處理原則的程度。
從 Spark 連線至 Azure Cosmos DB for Apache Cassandra
cqlsh
下列命令詳細說明如何從 cqlsh 連線至 Azure Cosmos DB for Apache Cassandra。 當您在 Spark 中執行範例時,這對於驗證相當有用。
從 Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1.Azure Databricks
下列文章說明 Azure Databricks 叢集佈建、連線至 Azure Cosmos DB for Apache Cassandra 的叢集設定,以及涵蓋 DDL 作業、DML 作業和其他項目的數個範例筆記本。
從 Azure Databricks 使用 Azure Cosmos DB for Apache Cassandra
2.Azure HDInsight-Spark
下列文章說明用於連線至 Azure Cosmos DB for Apache Cassandra 的 HDinsight-Spark 服務、佈建、叢集設定,以及涵蓋 DDL 作業、DML 作業和其他項目的數個範例筆記本。
從 Azure HDInsight-Spark 使用 Azure Cosmos DB for Apache Cassandra
3.一般情況下的 Spark 環境
上述各節是專門針對以 Azure Spark 為基礎的 PaaS 服務,而本節則涵蓋所有一般 Spark 環境。 連接器相依項目、匯入和 Spark 工作階段設定詳述如下。 <後續步驟>一節涵蓋適用於 DDL 作業、DML 作業和其他項目的程式碼範例。
連接器相依項目:
- 新增 Maven 座標,以取得適用於 Spark 的 Cassandra 連接器
- 針對適用於 API for Cassandra 的 Azure Cosmos DB 協助程式程式庫新增 Maven 座標
匯入:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Spark 工作階段設定:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
下一步
下列文章示範 Spark 如何與 Azure Cosmos DB for Apache Cassandra 整合。