Přenesení dat do služby Azure Cosmos DB pro Apache Cassandra ze Sparku
PLATÍ PRO: Cassandra
Tento článek popisuje, jak přenést data do služby Azure Cosmos DB pro Apache Cassandra ze Sparku.
Konfigurace rozhraní API pro Cassandra
Nastavte v clusteru poznámkových bloků následující konfiguraci Sparku. Je to jednorázová aktivita.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Poznámka:
Pokud používáte Spark 3.x, nemusíte instalovat pomocné rutiny a objekt pro vytváření připojení služby Azure Cosmos DB. Místo konektoru Spark 3 byste také měli použít remoteConnectionsPerExecutor
connections_per_executor_max
(viz výše).
Upozorňující
Ukázky Sparku 3 uvedené v tomto článku byly testovány se Sparkem verze 3.2.1 a odpovídajícím konektorem Cassandra Spark com.datastax.spark :spark-cassandra-connector-assembly_2.12:3.2.0. Novější verze Sparku nebo konektoru Cassandra nemusí fungovat podle očekávání.
Rozhraní API datového rámce
Vytvoření datového rámce
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// (1) Update: Changing author name to include prefix of "Sir"
// (2) Insert: adding a new book
val booksUpsertDF = Seq(
("b00001", "Sir Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Sir Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Sir Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Sir Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Sir Arthur Conan Doyle", "The hounds of Baskerville", 1901),
("b09999", "Sir Arthur Conan Doyle", "The return of Sherlock Holmes", 1905)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
booksUpsertDF.show()
Upsert dat
// Upsert is no different from create
booksUpsertDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.save()
Aktualizace dat
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
//This runs on the driver, leverage only for one off updates
cdbConnector.withSessionDo(session => session.execute("update books_ks.books set book_price=99.33 where book_id ='b00300' and book_pub_year = 1901;"))
RDD API
Poznámka:
Upsert z rozhraní RDD API je stejný jako operace vytvoření.
Další kroky
Pokračujte k následujícím článkům a proveďte další operace s daty uloženými v tabulkách Azure Cosmos DB for Apache Cassandra: