Delen via


Gegevens lezen uit Azure Cosmos DB voor Apache Cassandra-tabellen met behulp van Spark

VAN TOEPASSING OP: Cassandra

In dit artikel wordt beschreven hoe u gegevens leest die zijn opgeslagen in Azure Cosmos DB voor Apache Cassandra vanuit Spark.

API voor Cassandra-configuratie

Stel de onderstaande Spark-configuratie in uw notebookcluster in. Het is één keer activiteit.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Notitie

Als u Spark 3.x gebruikt, hoeft u de Azure Cosmos DB-helper en verbindingsfactory niet te installeren. U moet ook gebruiken remoteConnectionsPerExecutor in plaats van connections_per_executor_max voor de Spark 3-connector (zie hierboven).

Waarschuwing

De Spark 3-voorbeelden die in dit artikel worden weergegeven, zijn getest met Spark-versie 3.2.1 en de bijbehorende Cassandra Spark-connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Latere versies van Spark en/of de Cassandra-connector werken mogelijk niet zoals verwacht.

DataFrame-API

Tabel lezen met de opdracht session.read.format

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

Tabel lezen met spark.read.cassandraFormat

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

Specifieke kolommen in tabel lezen

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Filters toepassen

U kunt predicaten naar de database pushen om betere geoptimaliseerde Spark-query's mogelijk te maken. Een predicaat is een voorwaarde voor een query die waar of onwaar retourneert, meestal in de WHERE-component. Met een predicaatpush worden de gegevens in de databasequery gefilterd, waardoor het aantal vermeldingen dat uit de database is opgehaald, wordt verminderd en de queryprestaties worden verbeterd. Standaard pusht de Spark Dataset-API automatisch geldige WHERE-componenten naar de database.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

De Cassandra Filters sectie van het fysieke plan bevat het gepushte filter.

partities

RDD-API

Tabel lezen

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

Specifieke kolommen in tabel lezen

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

SQL-weergaven

Een tijdelijke weergave maken vanuit een dataframe

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

Query's uitvoeren op de weergave

select * from books_vw where book_pub_year > 1891

Volgende stappen

Hier volgen aanvullende artikelen over het werken met Azure Cosmos DB voor Apache Cassandra vanuit Spark: