Dela via


Aggregera åtgärder i Azure Cosmos DB för Apache Cassandra-tabeller från Spark

GÄLLER FÖR: Kassandra

Den här artikeln beskriver grundläggande aggregeringsåtgärder mot Azure Cosmos DB för Apache Cassandra-tabeller från Spark.

Kommentar

Filtrering på serversidan och sammansättning på serversidan stöds för närvarande inte i Azure Cosmos DB för Apache Cassandra.

API för Cassandra-konfiguration

Ställ in spark-konfigurationen nedan i notebook-klustret. Det är en engångsaktivitet.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Kommentar

Om du använder Spark 3.x behöver du inte installera Azure Cosmos DB-hjälpen och anslutningsfabriken. Du bör också använda remoteConnectionsPerExecutor i stället connections_per_executor_max för för Spark 3-anslutningsappen (se ovan).

Varning

Spark 3-exemplen som visas i den här artikeln har testats med Spark version 3.2.1 och motsvarande Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Senare versioner av Spark och/eller Cassandra-anslutningsappen kanske inte fungerar som förväntat.

Exempeldatagenerator

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Antal åtgärder

RDD-API

sc.cassandraTable("books_ks", "books").count

Utdata:

count: Long = 5

Dataframe-API

Antal mot dataramar stöds för närvarande inte. Exemplet nedan visar hur du kör ett antal dataramar efter att dataramen har sparats i minnet som en lösning.

Välj ett lagringsalternativ bland följande tillgängliga alternativ för att undvika problem med "slut på minne":

  • MEMORY_ONLY: Det är standardalternativet för lagring. Lagrar RDD som deserialiserade Java-objekt i JVM. Om RDD:t inte får plats i minnet cachelagras inte vissa partitioner och de räknas om i farten varje gång de behövs.

  • MEMORY_AND_DISK: Lagrar RDD som deserialiserade Java-objekt i JVM. Om RDD:t inte får plats i minnet lagrar du de partitioner som inte får plats på disken och läser dem när det behövs från den plats där de lagras.

  • MEMORY_ONLY_SER (Java/Scala): Lagrar RDD som serialiserade Java-objekt – 1 byte matris per partition. Det här alternativet är utrymmeseffektivt jämfört med deserialiserade objekt, särskilt när du använder en snabb serialiserare, men mer CPU-intensiv att läsa.

  • MEMORY_AND_DISK_SER (Java/Scala): Det här lagringsalternativet är som MEMORY_ONLY_SER, den enda skillnaden är att det spiller partitioner som inte får plats i diskminnet i stället för att omberäkna dem när de behövs.

  • DISK_ONLY: Lagrar endast RDD-partitionerna på disken.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Samma som nivåerna ovan, men replikerar varje partition på två klusternoder.

  • OFF_HEAP (experimentell): Liknar MEMORY_ONLY_SER, men lagrar data i minnet utanför heapen, och det kräver att off-heap-minne aktiveras i förväg.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache 
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Genomsnittlig åtgärd

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Utdata:

res24: Double = 16.016000175476073

Dataframe-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Utdata:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

Utdata:

16.016000175476073

Minsta åtgärd

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Utdata:

res31: Float = 11.33

Dataframe-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Utdata:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

%sql
select avg(book_price) from books_vw;

Utdata:

11.33

Maxåtgärd

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

Dataframe-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Utdata:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

%sql
select max(book_price) from books_vw;

Utdata:

22.45

Summaåtgärd

RDD-API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Utdata:

res46: Double = 80.08000087738037

Dataframe-API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Utdata:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

Utdata:

80.08000087738037

Topp- eller jämförbar åtgärd

RDD-API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Utdata:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

Dataframe-API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Utdata:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Nästa steg

Information om hur du utför tabellkopieringsåtgärder finns i: