Statistische bewerkingen in Azure Cosmos DB voor Apache Cassandra-tabellen van Spark
VAN TOEPASSING OP: Cassandra
In dit artikel worden eenvoudige aggregatiebewerkingen voor Azure Cosmos DB voor Apache Cassandra-tabellen van Spark beschreven.
Notitie
Filteren aan de serverzijde en aggregatie aan de serverzijde wordt momenteel niet ondersteund in Azure Cosmos DB voor Apache Cassandra.
API voor Cassandra-configuratie
Stel de onderstaande Spark-configuratie in uw notebookcluster in. Het is een eenmalige activiteit.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Notitie
Als u Spark 3.x gebruikt, hoeft u de Helper en verbindingsfactory van Azure Cosmos DB niet te installeren. U moet ook gebruiken remoteConnectionsPerExecutor
in plaats van connections_per_executor_max
voor de Spark 3-connector (zie hierboven).
Waarschuwing
De Spark 3-voorbeelden die in dit artikel worden weergegeven, zijn getest met Spark-versie 3.2.1 en de bijbehorende Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Latere versies van Spark en/of de Cassandra-connector werken mogelijk niet zoals verwacht.
Voorbeeldgegevensgenerator
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Aantalbewerking
RDD-API
sc.cassandraTable("books_ks", "books").count
Output:
count: Long = 5
DataFrame-API
Telling voor dataframes wordt momenteel niet ondersteund. In het onderstaande voorbeeld ziet u hoe u een aantal dataframes uitvoert nadat u het dataframe als tijdelijke oplossing in het geheugen hebt bewaard.
Kies een opslagoptie uit de volgende beschikbare opties om problemen met onvoldoende geheugen te voorkomen:
MEMORY_ONLY: dit is de standaardopslagoptie. Slaat RDD op als gedeserialiseerde Java-objecten in de JVM. Als de RDD niet in het geheugen past, worden sommige partities niet in de cache opgeslagen en worden ze telkens opnieuw berekend wanneer ze nodig zijn.
MEMORY_AND_DISK: RDD wordt opgeslagen als gedeserialiseerde Java-objecten in de JVM. Als de RDD niet in het geheugen past, slaat u de partities op die niet op de schijf passen en leest u deze indien nodig vanaf de locatie waar ze zijn opgeslagen.
MEMORY_ONLY_SER (Java/Scala): SLAAT RDD op als geserialiseerde Java-objecten- matrix van 1 bytes per partitie. Deze optie is ruimte-efficiƫnt in vergelijking met gedeserialiseerde objecten, met name bij gebruik van een snelle serializer, maar meer CPU-intensief om te lezen.
MEMORY_AND_DISK_SER (Java/Scala): deze opslagoptie lijkt op MEMORY_ONLY_SER. Het enige verschil is dat partities die niet in het schijfgeheugen passen, overlopen in plaats van ze opnieuw te berekenen wanneer ze nodig zijn.
DISK_ONLY: slaat de RDD-partities alleen op de schijf op.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Hetzelfde als de bovenstaande niveaus, maar repliceert elke partitie op twee clusterknooppunten.
OFF_HEAP (experimenteel): vergelijkbaar met MEMORY_ONLY_SER, maar de gegevens worden opgeslagen in off-heap-geheugen en het vereist dat vooraf off-heap-geheugen is ingeschakeld.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Gemiddelde bewerking
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Output:
res24: Double = 16.016000175476073
DataFrame-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Output:
+------------------+
| avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+
SQL
select avg(book_price) from books_vw;
Output:
16.016000175476073
Minimale bewerking
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Output:
res31: Float = 11.33
DataFrame-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Output:
+---------------+
|min(book_price)|
+---------------+
| 11.33|
+---------------+
SQL
%sql
select avg(book_price) from books_vw;
Output:
11.33
Maximale bewerking
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
DataFrame-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Output:
+---------------+
|max(book_price)|
+---------------+
| 22.45|
+---------------+
SQL
%sql
select max(book_price) from books_vw;
Output:
22.45
Sombewerking
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Output:
res46: Double = 80.08000087738037
DataFrame-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Output:
+-----------------+
| sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+
SQL
select sum(book_price) from books_vw;
Output:
80.08000087738037
Bovenste of vergelijkbare bewerking
RDD-API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Output:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
DataFrame-API
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Output:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name|book_price|
+--------------------+----------+
| A sign of four| 22.45|
|The adventures of...| 19.83|
|The memoirs of Sh...| 14.22|
+--------------------+----------+
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;
Volgende stappen
Als u tabelkopiebewerkingen wilt uitvoeren, raadpleegt u: