Gegevens maken/invoegen in Azure Cosmos DB voor Apache Cassandra vanuit Spark
VAN TOEPASSING OP: Cassandra
In dit artikel wordt beschreven hoe u voorbeeldgegevens invoegt in een tabel in Azure Cosmos DB voor Apache Cassandra vanuit Spark.
API voor Cassandra-configuratie
Stel de onderstaande Spark-configuratie in uw notebookcluster in. Het is één keer activiteit.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Notitie
Als u Spark 3.x gebruikt, hoeft u de Azure Cosmos DB-helper en verbindingsfactory niet te installeren. U moet ook gebruiken remoteConnectionsPerExecutor
in plaats van connections_per_executor_max
voor de Spark 3-connector (zie hierboven).
Waarschuwing
De Spark 3-voorbeelden die in dit artikel worden weergegeven, zijn getest met Spark-versie 3.2.1 en de bijbehorende Cassandra Spark-connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Latere versies van Spark en/of de Cassandra-connector werken mogelijk niet zoals verwacht.
DataFrame-API
Een Dataframe maken met voorbeeldgegevens
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a dataframe containing five records
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
//Review schema
booksDF.printSchema
//Print
booksDF.show
Notitie
De functionaliteit 'Maken als deze niet bestaat' wordt nog niet ondersteund op rijniveau.
Behouden naar Azure Cosmos DB voor Apache Cassandra
Wanneer u gegevens opslaat, kunt u ook time-to-live- en consistentiebeleidsinstellingen instellen, zoals wordt weergegeven in het volgende voorbeeld:
//Persist
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Valideren in cqlsh
use books_ks;
select * from books;
RDD-API (Resilient Distributed Database)
Een RDD maken met voorbeeldgegevens
//Drop and re-create table to delete records created in the previous section
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
//Create RDD
val booksRDD = sc.parallelize(Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))
//Review
booksRDD.take(2).foreach(println)
Notitie
Maken als er nog geen functionaliteit bestaat, wordt nog niet ondersteund.
Behouden naar Azure Cosmos DB voor Apache Cassandra
Wanneer u gegevens opslaat in DE API voor Cassandra, kunt u ook time-to-live- en consistentiebeleidsinstellingen instellen, zoals wordt weergegeven in het volgende voorbeeld:
import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel
//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))
Valideren in cqlsh
use books_ks;
select * from books;
Volgende stappen
Nadat u gegevens hebt ingevoegd in de Azure Cosmos DB voor Apache Cassandra-tabel, gaat u verder met de volgende artikelen om andere bewerkingen uit te voeren op de gegevens die zijn opgeslagen in Azure Cosmos DB voor Apache Cassandra: