Partage via


Lire les données de tables Azure Cosmos DB for Apache Cassandra à l’aide de Spark

S’APPLIQUE À : Cassandra

Cet article explique comment lire des données stockées dans Azure Cosmos DB for Apache Cassandra à partir de Spark.

API pour la configuration Cassandra

Définissez la configuration spark ci-dessous dans votre cluster de notebooks. Cette activité s’effectue une seule fois.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Notes

Si vous utilisez Spark 3.x, il n’est pas nécessaire d’installer l’assistance Cosmos DB ni la fabrique de connexion. Par ailleurs, utilisez remoteConnectionsPerExecutor plutôt que connections_per_executor_max pour le connecteur Spark 3 (cf. ci-dessus).

Avertissement

Les exemples Spark 3 présentés dans cet article ont été testés avec la version 3.2.1 de Spark et le connecteur Spark Cassandra correspondant, com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Les versions ultérieures de Spark et/ou du connecteur Cassandra peuvent ne pas fonctionner comme prévu.

API Dataframe

Lire une table avec la commande session.read.format

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

Lire une table avec la commande spark.read.cassandraFormat

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

Lire des colonnes spécifiques d’une table

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Appliquer des filtres

Vous pouvez effectuer un pushdown de prédicats vers la base de données pour optimiser encore les requêtes Spark. Un prédicat est une condition sur une requête, généralement dans la clause WHERE, qui retourne true ou false. Un pushdown de prédicat filtre les données dans la requête de base de données, réduisant ainsi le nombre d’entrées récupérées à partir de la base de données et améliorant les performances des requêtes. Par défaut, l’API Spark DataSet effectue automatiquement un pushdown des clauses WHERE valides vers la base de données.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

La section Cassandra Filters du plan physique comprend le filtre pushed down.

partitions

API pour le jeu de donnée distribué résilient

Lire une table

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

Lire des colonnes spécifiques d’une table

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

Vues SQL

Créer une vue temporaire à partir d’un dataframe

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

Exécuter des requêtes sur la vue

select * from books_vw where book_pub_year > 1891

Étapes suivantes

Voici des articles supplémentaires sur l’utilisation d’Azure Cosmos DB for Apache Cassandra à partir de Spark :