Share via


Statistische bewerkingen in Azure Cosmos DB voor Apache Cassandra-tabellen van Spark

VAN TOEPASSING OP: Cassandra

In dit artikel worden eenvoudige aggregatiebewerkingen voor Azure Cosmos DB voor Apache Cassandra-tabellen van Spark beschreven.

Notitie

Filteren aan de serverzijde en aggregatie aan de serverzijde wordt momenteel niet ondersteund in Azure Cosmos DB voor Apache Cassandra.

API voor Cassandra-configuratie

Stel de onderstaande Spark-configuratie in uw notebookcluster in. Het is een eenmalige activiteit.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Notitie

Als u Spark 3.x gebruikt, hoeft u de Helper en verbindingsfactory van Azure Cosmos DB niet te installeren. U moet ook gebruiken remoteConnectionsPerExecutor in plaats van connections_per_executor_max voor de Spark 3-connector (zie hierboven).

Waarschuwing

De Spark 3-voorbeelden die in dit artikel worden weergegeven, zijn getest met Spark-versie 3.2.1 en de bijbehorende Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Latere versies van Spark en/of de Cassandra-connector werken mogelijk niet zoals verwacht.

Voorbeeldgegevensgenerator

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Aantalbewerking

RDD-API

sc.cassandraTable("books_ks", "books").count

Output:

count: Long = 5

DataFrame-API

Telling voor dataframes wordt momenteel niet ondersteund. In het onderstaande voorbeeld ziet u hoe u een aantal dataframes uitvoert nadat u het dataframe als tijdelijke oplossing in het geheugen hebt bewaard.

Kies een opslagoptie uit de volgende beschikbare opties om problemen met onvoldoende geheugen te voorkomen:

  • MEMORY_ONLY: dit is de standaardopslagoptie. Slaat RDD op als gedeserialiseerde Java-objecten in de JVM. Als de RDD niet in het geheugen past, worden sommige partities niet in de cache opgeslagen en worden ze telkens opnieuw berekend wanneer ze nodig zijn.

  • MEMORY_AND_DISK: RDD wordt opgeslagen als gedeserialiseerde Java-objecten in de JVM. Als de RDD niet in het geheugen past, slaat u de partities op die niet op de schijf passen en leest u deze indien nodig vanaf de locatie waar ze zijn opgeslagen.

  • MEMORY_ONLY_SER (Java/Scala): SLAAT RDD op als geserialiseerde Java-objecten- matrix van 1 bytes per partitie. Deze optie is ruimte-efficiĆ«nt in vergelijking met gedeserialiseerde objecten, met name bij gebruik van een snelle serializer, maar meer CPU-intensief om te lezen.

  • MEMORY_AND_DISK_SER (Java/Scala): deze opslagoptie lijkt op MEMORY_ONLY_SER. Het enige verschil is dat partities die niet in het schijfgeheugen passen, overlopen in plaats van ze opnieuw te berekenen wanneer ze nodig zijn.

  • DISK_ONLY: slaat de RDD-partities alleen op de schijf op.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Hetzelfde als de bovenstaande niveaus, maar repliceert elke partitie op twee clusterknooppunten.

  • OFF_HEAP (experimenteel): vergelijkbaar met MEMORY_ONLY_SER, maar de gegevens worden opgeslagen in off-heap-geheugen en het vereist dat vooraf off-heap-geheugen is ingeschakeld.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache 
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Gemiddelde bewerking

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Output:

res24: Double = 16.016000175476073

DataFrame-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Output:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

Output:

16.016000175476073

Minimale bewerking

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Output:

res31: Float = 11.33

DataFrame-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Output:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

%sql
select avg(book_price) from books_vw;

Output:

11.33

Maximale bewerking

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

DataFrame-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Output:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

%sql
select max(book_price) from books_vw;

Output:

22.45

Sombewerking

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Output:

res46: Double = 80.08000087738037

DataFrame-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Output:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

Output:

80.08000087738037

Bovenste of vergelijkbare bewerking

RDD-API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Output:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

DataFrame-API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Output:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Volgende stappen

Als u tabelkopiebewerkingen wilt uitvoeren, raadpleegt u: