Adatok létrehozása/beszúrása az Azure Cosmos DB-be az Apache Cassandra számára a Sparkból

A KÖVETKEZŐKRE VONATKOZIK: Cassandra

Ez a cikk bemutatja, hogyan szúrhat be mintaadatokat egy táblába az Apache Cassandra azure Cosmos DB-ben a Sparkból.

Api a Cassandra konfigurációhoz

Állítsa be az alábbi Spark-konfigurációt a jegyzetfüzetfürtben. Ez egy egyszeri tevékenység.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Megjegyzés

Ha Spark 3.x-et használ, nem kell telepítenie az Azure Cosmos DB segítőt és kapcsolat-előállítót. A Spark 3-összekötő helyett connections_per_executor_max is érdemes használnia remoteConnectionsPerExecutor (lásd fent).

Figyelmeztetés

A cikkben bemutatott Spark 3-mintákat a Spark 3.2.1-es verziójával és a megfelelő Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 verzióval teszteltük. Előfordulhat, hogy a Spark és/vagy a Cassandra-összekötő későbbi verziói nem a várt módon működnek.

Dataframe API

Adatkeret létrehozása mintaadatokkal

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Megjegyzés

A "Létrehozás, ha nem létezik" funkció egy sorszinten még nem támogatott.

Adatmegőrzés az Azure Cosmos DB-ben az Apache Cassandra-hoz

Az adatok mentésekor beállíthatja az élettartamra és a konzisztenciaházirendre vonatkozó beállításokat is az alábbi példában látható módon:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Ellenőrzés a cqlsh-ban

use books_ks;
select * from books;

Rugalmas elosztott adatbázis (RDD) API

RDD létrehozása mintaadatokkal

//Drop and re-create table to delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Megjegyzés

A létrehozás, ha nem létezik funkció még nem támogatott.

Adatmegőrzés az Azure Cosmos DB-ben az Apache Cassandra-hoz

Amikor adatokat ment a Cassandra API-ba, beállíthatja az élettartamra és a konzisztenciaházirendre vonatkozó beállításokat is az alábbi példában látható módon:

import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

Ellenőrzés a cqlsh-ban

use books_ks;
select * from books;

Következő lépések

Miután adatokat szúrt be az Azure Cosmos DB for Apache Cassandra táblába, folytassa az alábbi cikkekkel az Apache Cassandra Azure Cosmos DB-ben tárolt adatokon végzett egyéb műveletek végrehajtásához: