Spark'tan Apache Cassandra için Azure Cosmos DB tablolarında toplama işlemleri
ŞUNLAR IÇIN GEÇERLIDIR: Cassandra
Bu makalede, Spark'tan Apache Cassandra tabloları için Azure Cosmos DB'ye yönelik temel toplama işlemleri açıklanmaktadır.
Not
Sunucu tarafı filtreleme ve sunucu tarafı toplama şu anda Apache Cassandra için Azure Cosmos DB'de desteklenmemektedir.
Cassandra yapılandırması için API
Not defteri kümenizde spark yapılandırmasını aşağıda ayarlayın. Tek seferlik bir etkinlik.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Not
Spark 3.x kullanıyorsanız Azure Cosmos DB yardımcısını ve bağlantı fabrikasını yüklemeniz gerekmez. Spark 3 bağlayıcısı connections_per_executor_max
için yerine de kullanmanız remoteConnectionsPerExecutor
gerekir (yukarıya bakın).
Uyarı
Bu makalede gösterilen Spark 3 örnekleri Spark sürüm 3.2.1 ve buna karşılık gelen Cassandra Spark Bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 ile test edilmiştir. Spark ve/veya Cassandra bağlayıcısının sonraki sürümleri beklendiği gibi çalışmayabilir.
Örnek veri oluşturucu
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Count işlemi
RDD API'si
sc.cassandraTable("books_ks", "books").count
Çıktı:
count: Long = 5
Dataframe API'si
Veri çerçevelerine karşı sayı şu anda desteklenmiyor. Aşağıdaki örnekte, geçici bir çözüm olarak veri çerçevesini belleğe kalıcı hale getirmenin ardından veri çerçevesi sayısının nasıl yürütüldiği gösterilmektedir.
"Bellek yetersiz" sorunlarıyla karşılaşmamak için aşağıdaki kullanılabilir seçeneklerden bir depolama seçeneği belirleyin:
MEMORY_ONLY: Varsayılan depolama seçeneğidir. RDD'yi JVM'de seri durumdan çıkarılmış Java nesneleri olarak depolar. RDD belleğe sığmazsa, bazı bölümler önbelleğe alınmaz ve her gerektiğinde anında yeniden derler.
MEMORY_AND_DISK: RDD'yi JVM'de seri durumdan çıkarılmış Java nesneleri olarak depolar. RDD belleğe sığmıyorsa diske sığmayan bölümleri depolayın ve gerektiğinde depolandıkları konumdan okuyun.
MEMORY_ONLY_SER (Java/Scala): RDD'yi bölüm başına 1 baytlık serileştirilmiş Java nesneleri olarak depolar. Bu seçenek, özellikle hızlı seri hale getirici kullanırken seri durumdan çıkarılmış nesnelerle karşılaştırıldığında alan açısından verimlidir, ancak okumak için daha yoğun CPU kullanır.
MEMORY_AND_DISK_SER (Java/Scala): Bu depolama seçeneği MEMORY_ONLY_SER gibidir, tek fark, gerektiğinde yeniden derlemek yerine disk belleğine sığmayan bölümlerin dökülmesidir.
DISK_ONLY: RDD bölümlerini yalnızca diskte depolar.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Yukarıdaki düzeylerle aynıdır, ancak her bölümü iki küme düğümünde çoğaltır.
OFF_HEAP (deneysel): MEMORY_ONLY_SER benzer, ancak verileri yığın dışı bellekte depolar ve önceden yığın dışı belleğin etkinleştirilmesini gerektirir.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Ortalama işlem
RDD API'si
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Çıktı:
res24: Double = 16.016000175476073
Dataframe API'si
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Çıktı:
+------------------+
| avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+
SQL
select avg(book_price) from books_vw;
Çıktı:
16.016000175476073
En düşük işlem
RDD API'si
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Çıktı:
res31: Float = 11.33
Dataframe API'si
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Çıktı:
+---------------+
|min(book_price)|
+---------------+
| 11.33|
+---------------+
SQL
%sql
select avg(book_price) from books_vw;
Çıktı:
11.33
En fazla işlem
RDD API'si
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
Dataframe API'si
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Çıktı:
+---------------+
|max(book_price)|
+---------------+
| 22.45|
+---------------+
SQL
%sql
select max(book_price) from books_vw;
Çıktı:
22.45
Toplam işlemi
RDD API'si
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Çıktı:
res46: Double = 80.08000087738037
Dataframe API'si
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Çıktı:
+-----------------+
| sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+
SQL
select sum(book_price) from books_vw;
Çıktı:
80.08000087738037
Üst veya karşılaştırılabilir işlem
RDD API'si
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Çıktı:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
Dataframe API'si
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Çıktı:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name|book_price|
+--------------------+----------+
| A sign of four| 22.45|
|The adventures of...| 19.83|
|The memoirs of Sh...| 14.22|
+--------------------+----------+
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;
Sonraki adımlar
Tablo kopyalama işlemlerini gerçekleştirmek için bkz:
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin