Delen via


Statistische bewerkingen in Azure Cosmos DB voor Apache Cassandra-tabellen uit Spark

VAN TOEPASSING OP: Cassandra

In dit artikel worden basisaggregatiebewerkingen beschreven voor Azure Cosmos DB voor Apache Cassandra-tabellen uit Spark.

Notitie

Filteren aan de serverzijde en aggregatie aan de serverzijde wordt momenteel niet ondersteund in Azure Cosmos DB voor Apache Cassandra.

API voor Cassandra-configuratie

Stel de onderstaande Spark-configuratie in uw notebookcluster in. Het is één keer activiteit.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
 spark.cassandra.connection.port  10350
 spark.cassandra.connection.ssl.enabled  true
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000
 spark.cassandra.concurrent.reads  512
 spark.cassandra.output.batch.grouping.buffer.size  1000
 spark.cassandra.connection.keep_alive_ms  600000000

Notitie

Als u Spark 3.x gebruikt, hoeft u de Azure Cosmos DB-helper en verbindingsfactory niet te installeren. U moet ook gebruiken remoteConnectionsPerExecutor in plaats van connections_per_executor_max voor de Spark 3-connector (zie hierboven).

Waarschuwing

De Spark 3-voorbeelden die in dit artikel worden weergegeven, zijn getest met Spark-versie 3.2.1 en de bijbehorende Cassandra Spark-connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Latere versies van Spark en/of de Cassandra-connector werken mogelijk niet zoals verwacht.

Voorbeeldgegevensgenerator

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Aantalbewerking

RDD-API

sc.cassandraTable("books_ks", "books").count

Uitvoer:

count: Long = 5

DataFrame-API

Tellen voor dataframes wordt momenteel niet ondersteund. In het onderstaande voorbeeld ziet u hoe u een aantal dataframes uitvoert nadat u het dataframe als tijdelijke oplossing in het geheugen hebt bewaard.

Kies een opslagoptie uit de volgende beschikbare opties om te voorkomen dat er problemen met onvoldoende geheugen optreden:

  • MEMORY_ONLY: dit is de standaardopslagoptie. Slaat RDD op als gedeserialiseerde Java-objecten in de JVM. Als de RDD niet in het geheugen past, worden sommige partities niet in de cache opgeslagen en worden ze telkens opnieuw gecomputeerd wanneer ze nodig zijn.

  • MEMORY_AND_DISK: Hiermee slaat u RDD op als gedeserialiseerde Java-objecten in de JVM. Als de RDD niet in het geheugen past, slaat u de partities op die niet op schijf passen en leest u ze wanneer dat nodig is op de locatie waar ze zijn opgeslagen.

  • MEMORY_ONLY_SER (Java/Scala): Slaat RDD op als geserialiseerde Java-objecten- 1-bytematrix per partitie. Deze optie is ruimte-efficiënt in vergelijking met gedeserialiseerde objecten, vooral bij het gebruik van een snelle serializer, maar meer CPU-intensief om te lezen.

  • MEMORY_AND_DISK_SER (Java/Scala): deze opslagoptie lijkt op MEMORY_ONLY_SER, het enige verschil is dat partities die niet in het schijfgeheugen passen, overlopen in plaats van ze opnieuw te compileren wanneer ze nodig zijn.

  • DISK_ONLY: slaat de RDD-partities alleen op de schijf op.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: hetzelfde als de bovenstaande niveaus, maar repliceert elke partitie op twee clusterknooppunten.

  • OFF_HEAP (experimenteel): vergelijkbaar met MEMORY_ONLY_SER, maar de gegevens worden opgeslagen in off-heap-geheugen en vereist dat off-heap-geheugen vooraf wordt ingeschakeld.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Gemiddelde bewerking

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Uitvoer:

res24: Double = 16.016000175476073

DataFrame-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Uitvoer:

+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |

SQL

select avg(book_price) from books_vw;

Uitvoer:

16.016000175476073

Minimale bewerking

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Uitvoer:

res31: Float = 11.33

DataFrame-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Uitvoer:

+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |

SQL

%sql
select avg(book_price) from books_vw;

Uitvoer:

11.33

Maximale bewerking

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

DataFrame-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Uitvoer:

+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |

SQL

%sql
select max(book_price) from books_vw;

Uitvoer:

22.45

Sombewerking

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Uitvoer:

res46: Double = 80.08000087738037

DataFrame-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Uitvoer:

+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |

SQL

select sum(book_price) from books_vw;

Uitvoer:

80.08000087738037

Top of vergelijkbare bewerking

RDD-API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Uitvoer:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

DataFrame-API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Uitvoer:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Volgende stap