Gegevens lezen uit Azure Cosmos DB voor Apache Cassandra-tabellen met behulp van Spark
VAN TOEPASSING OP: Cassandra
In dit artikel wordt beschreven hoe u gegevens leest die zijn opgeslagen in Azure Cosmos DB voor Apache Cassandra vanuit Spark.
API voor Cassandra-configuratie
Stel de onderstaande Spark-configuratie in uw notebookcluster in. Het is één keer activiteit.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Notitie
Als u Spark 3.x gebruikt, hoeft u de Azure Cosmos DB-helper en verbindingsfactory niet te installeren. U moet ook gebruiken remoteConnectionsPerExecutor
in plaats van connections_per_executor_max
voor de Spark 3-connector (zie hierboven).
Waarschuwing
De Spark 3-voorbeelden die in dit artikel worden weergegeven, zijn getest met Spark-versie 3.2.1 en de bijbehorende Cassandra Spark-connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Latere versies van Spark en/of de Cassandra-connector werken mogelijk niet zoals verwacht.
DataFrame-API
Tabel lezen met de opdracht session.read.format
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Tabel lezen met spark.read.cassandraFormat
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Specifieke kolommen in tabel lezen
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Filters toepassen
U kunt predicaten naar de database pushen om betere geoptimaliseerde Spark-query's mogelijk te maken. Een predicaat is een voorwaarde voor een query die waar of onwaar retourneert, meestal in de WHERE-component. Met een predicaatpush worden de gegevens in de databasequery gefilterd, waardoor het aantal vermeldingen dat uit de database is opgehaald, wordt verminderd en de queryprestaties worden verbeterd. Standaard pusht de Spark Dataset-API automatisch geldige WHERE-componenten naar de database.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
De Cassandra Filters
sectie van het fysieke plan bevat het gepushte filter.
RDD-API
Tabel lezen
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Specifieke kolommen in tabel lezen
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
SQL-weergaven
Een tijdelijke weergave maken vanuit een dataframe
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Query's uitvoeren op de weergave
select * from books_vw where book_pub_year > 1891
Volgende stappen
Hier volgen aanvullende artikelen over het werken met Azure Cosmos DB voor Apache Cassandra vanuit Spark: