Spark'tan Apache Cassandra için Azure Cosmos DB'de tablo kopyalama işlemleri
ŞUNLAR IÇIN GEÇERLIDIR: Cassandra
Bu makalede, Spark'tan Apache Cassandra için Azure Cosmos DB'deki tablolar arasında veri kopyalama açıklanmaktadır. Bu makalede açıklanan komutlar, Apache Cassandra tablolarındaki verileri Apache Cassandra için Azure Cosmos DB tablolarına kopyalamak için de kullanılabilir.
Cassandra yapılandırması için API
Not defteri kümenizde spark yapılandırmasını aşağıda ayarlayın. Tek seferlik bir etkinlik.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Not
Spark 3.x kullanıyorsanız Azure Cosmos DB yardımcısını ve bağlantı fabrikasını yüklemeniz gerekmez. Spark 3 bağlayıcısı connections_per_executor_max
için yerine de kullanmanız remoteConnectionsPerExecutor
gerekir (yukarıya bakın).
Uyarı
Bu makalede gösterilen Spark 3 örnekleri Spark sürüm 3.2.1 ve buna karşılık gelen Cassandra Spark Bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 ile test edilmiştir. Spark ve/veya Cassandra bağlayıcısının sonraki sürümleri beklendiği gibi çalışmayabilir.
Örnek veri ekleme
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Tablolar arasında veri kopyalama
Tablolar arasında veri kopyalama (hedef tablo var)
//1) Create destination table
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books_copy(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
//2) Read from one table
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
//3) Save to destination table
readBooksDF.write
.cassandraFormat("books_copy", "books_ks", "")
.save()
//4) Validate copy to destination table
sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books_copy", "keyspace" -> "books_ks"))
.load
.show
Tablolar arasında veri kopyalama (hedef tablo yok)
import com.datastax.spark.connector._
//1) Read from source table
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
//2) Creates an empty table in the keyspace based off of source table
val newBooksDF = readBooksDF
newBooksDF.createCassandraTable(
"books_ks",
"books_new",
partitionKeyColumns = Some(Seq("book_id"))
//clusteringKeyColumns = Some(Seq("some column"))
)
//3) Saves the data from the source table into the newly created table
newBooksDF.write
.cassandraFormat("books_new", "books_ks","")
.mode(SaveMode.Append)
.save()
//4) Validate table creation and data load
sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books_new", "keyspace" -> "books_ks"))
.load
.show
Çıkış-
+-------+------------------+--------------------+----------+-------------+
|book_id| book_author| book_name|book_price|book_pub_year|
+-------+------------------+--------------------+----------+-------------+
| b00300|Arthur Conan Doyle|The hounds of Bas...| 12.25| 1901|
| b00001|Arthur Conan Doyle| A study in scarlet| 11.33| 1887|
| b00023|Arthur Conan Doyle| A sign of four| 22.45| 1890|
| b00501|Arthur Conan Doyle|The memoirs of Sh...| 14.22| 1893|
| b01001|Arthur Conan Doyle|The adventures of...| 19.83| 1892|
+-------+------------------+--------------------+----------+-------------+
import com.datastax.spark.connector._
readBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
newBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
Sonraki adımlar
- Java uygulaması kullanarak Cassandra hesabı, veritabanı ve tablo için API oluşturmaya başlayın.
- Java uygulaması kullanarak Cassandra için API tablosuna örnek veriler yükleyin.
- Java uygulaması kullanarak Cassandra hesabı için API'den veri sorgulama.