Összesítő műveletek az Azure Cosmos DB-n Apache Cassandra-táblákhoz a Sparkból

A KÖVETKEZŐKRE VONATKOZIK: Cassandra

Ez a cikk a Sparkból származó Apache Cassandra-táblákhoz készült Azure Cosmos DB alapszintű összesítési műveleteit ismerteti.

Megjegyzés

A kiszolgálóoldali szűrés és a kiszolgálóoldali összesítés jelenleg nem támogatott az Apache Cassandrához készült Azure Cosmos DB-ben.

Api a Cassandra konfigurációhoz

Állítsa be az alábbi Spark-konfigurációt a notebookfürtben. Ez egy egyszeri tevékenység.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Megjegyzés

Ha Spark 3.x-et használ, nem kell telepítenie az Azure Cosmos DB segéd- és kapcsolat-előállítót. A Spark 3-összekötő helyett connections_per_executor_max érdemes használnia remoteConnectionsPerExecutor (lásd fent).

Figyelmeztetés

A cikkben bemutatott Spark 3-mintákat a Spark 3.2.1-es verziójával és a megfelelő Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0-val teszteltük. Előfordulhat, hogy a Spark és/vagy a Cassandra-összekötő későbbi verziói nem a várt módon működnek.

Mintaadat-generátor

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Count művelet

RDD API

sc.cassandraTable("books_ks", "books").count

Kimeneti:

count: Long = 5

Dataframe API

Az adatkeretek számlálása jelenleg nem támogatott. Az alábbi minta bemutatja, hogyan hajthat végre egy adatkeret-számot, miután kerülő megoldásként megőrizte az adatkeretet a memóriában.

Válasszon tárolási lehetőséget az alábbi lehetőségek közül, hogy elkerülje a "kevés memóriával" kapcsolatos problémákat:

  • MEMORY_ONLY: Ez az alapértelmezett tárolási lehetőség. Az RDD-t deszerializált Java-objektumokként tárolja a JVM-ben. Ha az RDD nem fér el a memóriában, egyes partíciók nem lesznek gyorsítótárazva, és menet közben újrafordítanak minden alkalommal, amikor szükség van rájuk.

  • MEMORY_AND_DISK: Az RDD-t deszerializált Java-objektumokként tárolja a JVM-ben. Ha az RDD nem fér el a memóriában, tárolja azokat a partíciókat, amelyek nem férnek el a lemezen, és amikor szükséges, olvassa be őket a tárolt helyről.

  • MEMORY_ONLY_SER (Java/Scala): Az RDD-t szerializált Java-objektumokként tárolja – partíciónként 1 bájtos tömb. Ez a lehetőség térhatékony a deszerializált objektumokhoz képest, különösen gyors szerializáló használata esetén, de nagyobb processzorigényű olvasás esetén.

  • MEMORY_AND_DISK_SER (Java/Scala): Ez a tárolási lehetőség olyan, mint MEMORY_ONLY_SER, az egyetlen különbség az, hogy kibontja azokat a partíciókat, amelyek nem férnek el a lemezmemória számára ahelyett, hogy szükség esetén újrafordítanak volna őket.

  • DISK_ONLY: Az RDD-partíciókat csak a lemezen tárolja.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Megegyezik a fenti szintekkel, de minden partíciót két fürtcsomóponton replikál.

  • OFF_HEAP (kísérleti): Hasonlóan a MEMORY_ONLY_SER, de a halommemória memóriájában tárolja az adatokat, és a halommemória előzetes engedélyezését igényli.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache 
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Átlagos művelet

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Kimeneti:

res24: Double = 16.016000175476073

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Kimeneti:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

Kimeneti:

16.016000175476073

Minimális művelet

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Kimeneti:

res31: Float = 11.33

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Kimeneti:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

%sql
select avg(book_price) from books_vw;

Kimeneti:

11.33

Maximális művelet

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Kimeneti:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

%sql
select max(book_price) from books_vw;

Kimeneti:

22.45

Sum művelet

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Kimeneti:

res46: Double = 80.08000087738037

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Kimeneti:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

Kimeneti:

80.08000087738037

Top vagy hasonló művelet

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Kimeneti:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

Dataframe API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Kimeneti:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Következő lépések

A táblamásolási műveletek végrehajtásához lásd: