Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
PLATÍ PRO:
Cassandra
Tento článek popisuje základní agregační operace s tabulkami Azure Cosmos DB for Apache Cassandra ze Sparku.
Poznámka:
Filtrování na straně serveru a agregace na straně serveru se v současné době ve službě Azure Cosmos DB pro Apache Cassandra nepodporuje.
Konfigurace rozhraní API pro Cassandra
Nastavte v clusteru poznámkových bloků následující konfiguraci Sparku. Je to jednorázová aktivita.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Poznámka:
Pokud používáte Spark 3.x, nemusíte instalovat pomocné rutiny a objekt pro vytváření připojení služby Azure Cosmos DB. Místo konektoru Spark 3 byste také měli použít remoteConnectionsPerExecutorconnections_per_executor_max (viz výše).
Upozorňující
Ukázky Sparku 3 uvedené v tomto článku byly testovány se Sparkem verze 3.2.1 a odpovídajícím konektorem Cassandra Spark com.datastax.spark :spark-cassandra-connector-assembly_2.12:3.2.0. Novější verze Sparku nebo konektoru Cassandra nemusí fungovat podle očekávání.
Ukázkový generátor dat
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Operace count
RDD API
sc.cassandraTable("books_ks", "books").count
Výstup:
count: Long = 5
Rozhraní API datového rámce
Počet do datových rámců se v současné době nepodporuje. Následující ukázka ukazuje, jak po zachování datového rámce do paměti spustit počet datových rámců jako alternativní řešení.
Zvolte možnost úložiště z následujících dostupných možností, abyste se vyhnuli problémům s nedostatkem paměti:
MEMORY_ONLY: Jedná se o výchozí možnost úložiště. Ukládá RDD jako deserializované objekty Java v prostředí JVM. Pokud se sada RDD nevejde do paměti, některé oddíly se nebudou ukládat do mezipaměti a budou se průběžně přepočítat při každém jejich potřebě.
MEMORY_AND_DISK: Ukládá RDD jako deserializované objekty Java v prostředí JVM. Pokud se sada RDD nevejde do paměti, uložte oddíly, které se nevejdou na disk, a kdykoli je to potřeba, přečtěte si je z umístění, které jsou uložené.
MEMORY_ONLY_SER (Java/Scala): Ukládá RDD jako serializované objekty Java – pole 1 bajtů na oddíl. Tato možnost je prostorově efektivní ve srovnání s deserializovanými objekty, zejména při použití rychlého serializátoru, ale náročnější na čtení procesoru.
MEMORY_AND_DISK_SER (Java/Scala): Tato možnost úložiště je jako MEMORY_ONLY_SER, jediný rozdíl spočívá v tom, že rozdělí oddíly, které se nevejdou do paměti disku, místo aby je překompiluje, když jsou potřeba.
DISK_ONLY: Ukládá oddíly RDD pouze na disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Stejné jako výše uvedené úrovně, ale replikuje každý oddíl na dva uzly clusteru.
OFF_HEAP (experimentální): Podobá se MEMORY_ONLY_SER, ale ukládá data do paměti mimo haldu a vyžaduje, aby byla předem povolená paměť mimo haldu.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Průměrná operace
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Výstup:
res24: Double = 16.016000175476073
Rozhraní API datového rámce
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Výstup:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Výstup:
16.016000175476073
Minimální operace
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Výstup:
res31: Float = 11.33
Rozhraní API datového rámce
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Výstup:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Výstup:
11.33
Max. operace
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
Rozhraní API datového rámce
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Výstup:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Výstup:
22.45
Operace součet
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Výstup:
res46: Double = 80.08000087738037
Rozhraní API datového rámce
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Výstup:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Výstup:
80.08000087738037
Nejlepší nebo srovnatelná operace
RDD API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Výstup:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
Rozhraní API datového rámce
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Výstup:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;