Aracılığıyla paylaş


Spark'tan Apache Cassandra için Azure Cosmos DB tablolarında toplama işlemleri

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Bu makalede, Spark'tan Apache Cassandra tabloları için Azure Cosmos DB'ye yönelik temel toplama işlemleri açıklanmaktadır.

Not

Sunucu tarafı filtreleme ve sunucu tarafı toplama şu anda Apache Cassandra için Azure Cosmos DB'de desteklenmemektedir.

Cassandra yapılandırması için API

Not defteri kümenizde spark yapılandırmasını aşağıda ayarlayın. Tek seferlik bir etkinlik.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Not

Spark 3.x kullanıyorsanız Azure Cosmos DB yardımcısını ve bağlantı fabrikasını yüklemeniz gerekmez. Spark 3 bağlayıcısı connections_per_executor_max için yerine de kullanmanız remoteConnectionsPerExecutor gerekir (yukarıya bakın).

Uyarı

Bu makalede gösterilen Spark 3 örnekleri Spark sürüm 3.2.1 ve buna karşılık gelen Cassandra Spark Bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 ile test edilmiştir. Spark ve/veya Cassandra bağlayıcısının sonraki sürümleri beklendiği gibi çalışmayabilir.

Örnek veri oluşturucu

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Count işlemi

RDD API'si

sc.cassandraTable("books_ks", "books").count

Çıktı:

count: Long = 5

Dataframe API'si

Veri çerçevelerine karşı sayı şu anda desteklenmiyor. Aşağıdaki örnekte, geçici bir çözüm olarak veri çerçevesini belleğe kalıcı hale getirmenin ardından veri çerçevesi sayısının nasıl yürütüldiği gösterilmektedir.

"Bellek yetersiz" sorunlarıyla karşılaşmamak için aşağıdaki kullanılabilir seçeneklerden bir depolama seçeneği belirleyin:

  • MEMORY_ONLY: Varsayılan depolama seçeneğidir. RDD'yi JVM'de seri durumdan çıkarılmış Java nesneleri olarak depolar. RDD belleğe sığmazsa, bazı bölümler önbelleğe alınmaz ve her gerektiğinde anında yeniden derler.

  • MEMORY_AND_DISK: RDD'yi JVM'de seri durumdan çıkarılmış Java nesneleri olarak depolar. RDD belleğe sığmıyorsa diske sığmayan bölümleri depolayın ve gerektiğinde depolandıkları konumdan okuyun.

  • MEMORY_ONLY_SER (Java/Scala): RDD'yi bölüm başına 1 baytlık serileştirilmiş Java nesneleri olarak depolar. Bu seçenek, özellikle hızlı seri hale getirici kullanırken seri durumdan çıkarılmış nesnelerle karşılaştırıldığında alan açısından verimlidir, ancak okumak için daha yoğun CPU kullanır.

  • MEMORY_AND_DISK_SER (Java/Scala): Bu depolama seçeneği MEMORY_ONLY_SER gibidir, tek fark, gerektiğinde yeniden derlemek yerine disk belleğine sığmayan bölümlerin dökülmesidir.

  • DISK_ONLY: RDD bölümlerini yalnızca diskte depolar.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Yukarıdaki düzeylerle aynıdır, ancak her bölümü iki küme düğümünde çoğaltır.

  • OFF_HEAP (deneysel): MEMORY_ONLY_SER benzer, ancak verileri yığın dışı bellekte depolar ve önceden yığın dışı belleğin etkinleştirilmesini gerektirir.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache 
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Ortalama işlem

RDD API'si

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Çıktı:

res24: Double = 16.016000175476073

Dataframe API'si

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Çıktı:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

Çıktı:

16.016000175476073

En düşük işlem

RDD API'si

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Çıktı:

res31: Float = 11.33

Dataframe API'si

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Çıktı:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

%sql
select avg(book_price) from books_vw;

Çıktı:

11.33

En fazla işlem

RDD API'si

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

Dataframe API'si

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Çıktı:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

%sql
select max(book_price) from books_vw;

Çıktı:

22.45

Toplam işlemi

RDD API'si

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Çıktı:

res46: Double = 80.08000087738037

Dataframe API'si

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Çıktı:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

Çıktı:

80.08000087738037

Üst veya karşılaştırılabilir işlem

RDD API'si

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Çıktı:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

Dataframe API'si

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Çıktı:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Sonraki adımlar

Tablo kopyalama işlemlerini gerçekleştirmek için bkz: