共用方式為


從 Spark 對 Azure Cosmos DB for Apache Cassandra 資料表進行的彙總作業

適用於: Cassandra

本文說明從 Spark 對 Azure Cosmos DB for Apache Cassandra 資料表進行的基本彙總作業。

注意

Azure Cosmos DB for Apache Cassandra 中目前不支援伺服器端篩選和伺服器端彙總。

API for Cassandra 設定

在您的 Notebook 叢集中設定下列 Spark 設定。 這是一次性的活動。

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

注意

如果您使用 Spark 3.x,則無須安裝 Azure Cosmos DB 協助程式和連線中心。 您也應使用 remoteConnectionsPerExecutor,而不是 connections_per_executor_max Spark 3 連接器 (如上述)。

警告

本文所示的 Spark 3 範例已使用 Spark 3.2.1 版和對應的 Cassandra Spark 連接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 進行測試。 較新版本的 Spark 和/或 Cassandra 連接器可能無法如預期般運作。

範例資料產生器

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

計數運算

RDD API

sc.cassandraTable("books_ks", "books").count

輸出:

count: Long = 5

Dataframe API

目前不支援計算 dataframe。 以下範例說明如何在將 dataframe 保存至記憶體來作為因應措施之後,執行 dataframe 計數。

請從下列可用選項中選擇一個儲存體選項,以避免發生「記憶體不足」問題:

  • MEMORY_ONLY:這是預設的儲存體選項。 會將 RDD 儲存成 JVM 中已還原序列化的 Java 物件。 如果記憶體無法容納 RDD,系統就不會快取某些分割區,而會在每次需要它們時即時提出建議。

  • MEMORY_AND_DISK:會將 RDD 儲存成 JVM 中已還原序列化的 Java 物件。 如果記憶體無法容納 RDD,請將無法容納的分割區儲存在磁碟上,然後在每次需要時,從其儲存位置加以讀取。

  • MEMORY_ONLY_SER (Java/Scala):會將 RDD 儲存成已序列化的 Java 物件 (每一分割區 1 個位元組陣列)。 與已還原序列化的物件相比,此選項更符合空間效益,尤其是當使用快速但需要更大量 CPU 來進行讀取的序列化程式時。

  • MEMORY_AND_DISK_SER (Java/Scala):此儲存體選項與 MEMORY_ONLY_SER 類似,唯一的差異在於它會讓磁碟記憶體所無法容納的分割區溢出,而不會在需要它們時重新進行計算。

  • DISK_ONLY:會將 RDD 分割區只儲存在磁碟。

  • MEMORY_ONLY_2、MEMORY_AND_DISK_2…:與上述層級相同,但會在兩個叢集節點上複寫每個分割區。

  • OFF_HEAP (實驗性):與 MEMORY_ONLY_SER 類似,但會將資料儲存在堆集外記憶體中,而需要事先啟用堆集外記憶體。

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache 
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

平均值運算

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

輸出:

res24: Double = 16.016000175476073

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

輸出:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

輸出:

16.016000175476073

最小值運算

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

輸出:

res31: Float = 11.33

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

輸出:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

%sql
select avg(book_price) from books_vw;

輸出:

11.33

最大值運算

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

輸出:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

%sql
select max(book_price) from books_vw;

輸出:

22.45

總和運算

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

輸出:

res46: Double = 80.08000087738037

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

輸出:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

輸出:

80.08000087738037

排行或比較運算

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

輸出:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

Dataframe API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

輸出:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

下一步

若要執行資料表複製作業,請參閱: