Statistische bewerkingen in Azure Cosmos DB voor Apache Cassandra-tabellen uit Spark
VAN TOEPASSING OP: Cassandra
In dit artikel worden basisaggregatiebewerkingen beschreven voor Azure Cosmos DB voor Apache Cassandra-tabellen uit Spark.
Notitie
Filteren aan de serverzijde en aggregatie aan de serverzijde wordt momenteel niet ondersteund in Azure Cosmos DB voor Apache Cassandra.
API voor Cassandra-configuratie
Stel de onderstaande Spark-configuratie in uw notebookcluster in. Het is één keer activiteit.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Notitie
Als u Spark 3.x gebruikt, hoeft u de Azure Cosmos DB-helper en verbindingsfactory niet te installeren. U moet ook gebruiken remoteConnectionsPerExecutor
in plaats van connections_per_executor_max
voor de Spark 3-connector (zie hierboven).
Waarschuwing
De Spark 3-voorbeelden die in dit artikel worden weergegeven, zijn getest met Spark-versie 3.2.1 en de bijbehorende Cassandra Spark-connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Latere versies van Spark en/of de Cassandra-connector werken mogelijk niet zoals verwacht.
Voorbeeldgegevensgenerator
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Aantalbewerking
RDD-API
sc.cassandraTable("books_ks", "books").count
Uitvoer:
count: Long = 5
DataFrame-API
Tellen voor dataframes wordt momenteel niet ondersteund. In het onderstaande voorbeeld ziet u hoe u een aantal dataframes uitvoert nadat u het dataframe als tijdelijke oplossing in het geheugen hebt bewaard.
Kies een opslagoptie uit de volgende beschikbare opties om te voorkomen dat er problemen met onvoldoende geheugen optreden:
MEMORY_ONLY: dit is de standaardopslagoptie. Slaat RDD op als gedeserialiseerde Java-objecten in de JVM. Als de RDD niet in het geheugen past, worden sommige partities niet in de cache opgeslagen en worden ze telkens opnieuw gecomputeerd wanneer ze nodig zijn.
MEMORY_AND_DISK: Hiermee slaat u RDD op als gedeserialiseerde Java-objecten in de JVM. Als de RDD niet in het geheugen past, slaat u de partities op die niet op schijf passen en leest u ze wanneer dat nodig is op de locatie waar ze zijn opgeslagen.
MEMORY_ONLY_SER (Java/Scala): Slaat RDD op als geserialiseerde Java-objecten- 1-bytematrix per partitie. Deze optie is ruimte-efficiënt in vergelijking met gedeserialiseerde objecten, vooral bij het gebruik van een snelle serializer, maar meer CPU-intensief om te lezen.
MEMORY_AND_DISK_SER (Java/Scala): deze opslagoptie lijkt op MEMORY_ONLY_SER, het enige verschil is dat partities die niet in het schijfgeheugen passen, overlopen in plaats van ze opnieuw te compileren wanneer ze nodig zijn.
DISK_ONLY: slaat de RDD-partities alleen op de schijf op.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: hetzelfde als de bovenstaande niveaus, maar repliceert elke partitie op twee clusterknooppunten.
OFF_HEAP (experimenteel): vergelijkbaar met MEMORY_ONLY_SER, maar de gegevens worden opgeslagen in off-heap-geheugen en vereist dat off-heap-geheugen vooraf wordt ingeschakeld.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Gemiddelde bewerking
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Uitvoer:
res24: Double = 16.016000175476073
DataFrame-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Uitvoer:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Uitvoer:
16.016000175476073
Minimale bewerking
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Uitvoer:
res31: Float = 11.33
DataFrame-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Uitvoer:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Uitvoer:
11.33
Maximale bewerking
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
DataFrame-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Uitvoer:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Uitvoer:
22.45
Sombewerking
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Uitvoer:
res46: Double = 80.08000087738037
DataFrame-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Uitvoer:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Uitvoer:
80.08000087738037
Top of vergelijkbare bewerking
RDD-API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Uitvoer:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
DataFrame-API
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Uitvoer:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;