Aracılığıyla paylaş


Spark'tan Apache Cassandra için Azure Cosmos DB'ye veri oluşturma/ekleme

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Bu makalede, Spark'tan Apache Cassandra için Azure Cosmos DB'de bir tabloya örnek verilerin nasıl eklediği açıklanır.

Cassandra yapılandırması için API

Not defteri kümenizde spark yapılandırmasını aşağıda ayarlayın. Tek seferlik bir etkinlik.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Not

Spark 3.x kullanıyorsanız Azure Cosmos DB yardımcısını ve bağlantı fabrikasını yüklemeniz gerekmez. Spark 3 bağlayıcısı connections_per_executor_max için yerine de kullanmanız remoteConnectionsPerExecutor gerekir (yukarıya bakın).

Uyarı

Bu makalede gösterilen Spark 3 örnekleri Spark sürüm 3.2.1 ve buna karşılık gelen Cassandra Spark Bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 ile test edilmiştir. Spark ve/veya Cassandra bağlayıcısının sonraki sürümleri beklendiği gibi çalışmayabilir.

Dataframe API'si

Örnek verilerle Veri Çerçevesi oluşturma

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Not

Satır düzeyinde "Yoksa oluştur" işlevi henüz desteklenmiyor.

Apache Cassandra için Azure Cosmos DB'de kalıcı olma

Verileri kaydederken, aşağıdaki örnekte gösterildiği gibi yaşam süresi ve tutarlılık ilkesi ayarlarını da ayarlayabilirsiniz:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

cqlsh'de doğrulama

use books_ks;
select * from books;

Dayanıklı Dağıtılmış Veritabanı (RDD) API'si

Örnek verilerle RDD oluşturma

//Drop and re-create table to delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Not

Yoksa oluşturma işlevi henüz desteklenmiyor.

Apache Cassandra için Azure Cosmos DB'de kalıcı olma

Cassandra için API'ye veri kaydederken, aşağıdaki örnekte gösterildiği gibi yaşam süresi ve tutarlılık ilkesi ayarlarını da ayarlayabilirsiniz:

import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

cqlsh'de doğrulama

use books_ks;
select * from books;

Sonraki adımlar

Apache Cassandra için Azure Cosmos DB tablosuna veri ekledikten sonra, Apache Cassandra için Azure Cosmos DB'de depolanan veriler üzerinde başka işlemler gerçekleştirmek üzere aşağıdaki makalelere geçin: