分享方式:


使用 Spark 從 Azure Cosmos DB for Apache Cassandra 資料表中讀取資料

適用於: Cassandra

此文章描述如何從 Spark 讀取 Azure Cosmos DB for Apache Cassandra 中儲存的資料。

API for Cassandra 設定

在您的 Notebook 叢集中設定下列 Spark 設定。 這是一次性的活動。

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

注意

如果您使用 Spark 3.x,則無須安裝 Azure Cosmos DB 協助程式和連線中心。 您也應使用 remoteConnectionsPerExecutor,而不是 connections_per_executor_max Spark 3 連接器 (如上述)。

警告

本文所示的 Spark 3 範例已使用 Spark 3.2.1 版和對應的 Cassandra Spark 連接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 進行測試。 較新版本的 Spark 和/或 Cassandra 連接器可能無法如預期般運作。

Dataframe API

使用 session.read.format 命令讀取資料表

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

使用 spark.read.cassandraFormat 讀取資料表

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

讀取資料表中的特定資料行

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

套用篩選

您可以將述詞下推至資料庫,以允許更好的最佳化 Spark 查詢。 述詞是傳回 true 或 false 的查詢條件,通常位於 WHERE 子句中。 述詞會將資料庫查詢中的資料下推至篩選條件,這會減少從資料庫取出的項目數目並改善查詢效能。 依預設,Spark 資料集 API 會自動將有效的 WHERE 子句下推至資料庫。

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

實體方案的 Cassandra Filters 區段包含下推的篩選條件。

分割區

RDD API

讀取資料表

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

讀取資料表中的特定資料行

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

SQL 檢視

從資料框架建立暫存的檢視

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

針對檢視執行查詢

select * from books_vw where book_pub_year > 1891

下一步

下列是從 Spark 使用 Azure Cosmos DB for Apache Cassandra 的其他相關文章: