Partage via


Opérations DDL dans Azure Cosmos DB pour Apache Cassandra à partir de Spark

S’APPLIQUE À : Cassandra

Cet article décrit en détail les opérations DDL d’espace de clés et de table dans Azure Cosmos DB for Apache Cassandra à partir de Spark.

Contexte Spark

Le connecteur pour l’API pour Cassandra nécessite que les informations de la connexion Cassandra soient initialisées dans le cadre du contexte spark. Quand vous lancez un notebook, le contexte Spark est déjà initialisé. Il n’est pas recommandé de l’arrêter et de le réinitialiser. Une solution consiste à ajouter l’instance de l’API pour Cassandra à un niveau de cluster, dans la configuration du cluster spark. Cette opération s’effectue une seule fois par cluster. Ajoutez à la configuration Spark le code suivant, où chaque paire clé-valeur utilise un espace en guise de séparation :

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Notes

Si vous utilisez Spark 3.x, il n’est pas nécessaire d’installer l’assistance Cosmos DB ni la fabrique de connexion. Par ailleurs, utilisez remoteConnectionsPerExecutor plutôt que connections_per_executor_max pour le connecteur Spark 3 (cf. ci-dessus).

Avertissement

Les exemples Spark 3 présentés dans cet article ont été testés avec la version 3.2.1 de Spark et le connecteur Spark Cassandra correspondant, com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Les versions ultérieures de Spark et/ou du connecteur Cassandra peuvent ne pas fonctionner comme prévu.

Opérations DDL d’espace de clés

Créer un espace de clés

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Valider dans cqlsh

Exécutez la commande suivante dans cqlsh ; vous devriez voir l’espace de clés créé précédemment.

DESCRIBE keyspaces;

Supprimer un espace de clés

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Valider dans cqlsh

DESCRIBE keyspaces;

Opérations DDL de table

Considérations :

  • Le débit peut être assigné au niveau de la table à l’aide de l’instruction create table.
  • Une clé de partition peut stocker 20 Go de données.
  • Un enregistrement peut stocker un maximum de 2 Mo de données.
  • Une plage de clés de partition peut stocker plusieurs clés de partition.

Créer une table

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Valider dans cqlsh

Exécutez la commande suivante dans cqlsh ; vous devriez voir la table nommée «books» :

USE books_ks;
DESCRIBE books;

Les valeurs de durée de vie par défaut et de débit provisionnées ne figurent pas dans la sortie de la commande précédente ; vous pouvez les obtenir à partir du portail.

Modifier une table

Vous pouvez modifier les valeurs suivantes à l’aide de la commande alter table :

  • Débit provisionné
  • Durée de vie
    Les modifications de colonne ne sont pas prises en charge.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Supprimer une table

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Valider dans cqlsh

Exécutez la commande suivante dans cqlsh ; vous devriez voir que la table « books » n’est plus disponible :

USE books_ks;
DESCRIBE tables;

Étapes suivantes

Après avoir créé l’espace de clés et la table, passez aux articles suivants, qui traitent, entre autres, des opérations CRUD :