DDL-műveletek az Apache Cassandra azure Cosmos DB-ben a Sparkból

A KÖVETKEZŐKRE VONATKOZIK: Cassandra

Ez a cikk az Apache Cassandra Azure Cosmos DB-hez készült Kulcstér- és tábla DDL-műveleteit ismerteti a Sparkból.

Spark-környezet

A Cassandra API-hoz készült összekötő megköveteli, hogy a Cassandra-kapcsolat részleteit inicializálni kell a Spark-környezet részeként. Amikor elindít egy jegyzetfüzetet, a Spark-környezet már inicializálva van, és nem ajánlott leállítani és újrainicializálni. Az egyik megoldás a Cassandra-példányok api-konfigurációjának fürtszintű hozzáadása a fürt spark-konfigurációjában. Fürtenként egyszeri tevékenység. Adja hozzá a következő kódot a Spark-konfigurációhoz szóközzel elválasztott kulcsértékpárként:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Megjegyzés

Ha Spark 3.x-et használ, nem kell telepítenie az Azure Cosmos DB segítőt és kapcsolat-előállítót. A Spark 3-összekötő helyett connections_per_executor_max is érdemes használnia remoteConnectionsPerExecutor (lásd fent).

Figyelmeztetés

A cikkben bemutatott Spark 3-mintákat a Spark 3.2.1-es verziójával és a megfelelő Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1-es verziójával teszteltük. Előfordulhat, hogy a Spark és/vagy a Cassandra-összekötő későbbi verziói nem a várt módon működnek.

Keyspace DDL-műveletek

Kulcstér létrehozása

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Ellenőrzés a cqlsh-ban

Futtassa a következő parancsot a cqlsh-ban, és látnia kell a korábban létrehozott kulcsteret.

DESCRIBE keyspaces;

Kulcstér elvetése

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Ellenőrzés a cqlsh-ban

DESCRIBE keyspaces;

Tábla DDL-műveletei

Megfontolások:

  • Az átviteli sebesség a tábla szintjén rendelhető hozzá a tábla létrehozása utasítással.
  • Egy partíciókulcs 20 GB adatot tárolhat.
  • Egy rekord legfeljebb 2 MB adatot tárolhat.
  • Egy partíciókulcs-tartomány több partíciókulcsot is tárolhat.

Tábla létrehozása

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Ellenőrzés a cqlsh-ban

Futtassa a következő parancsot a cqlsh-ban, és a "books" nevű táblának kell megjelennie:

USE books_ks;
DESCRIBE books;

A kiosztott átviteli sebesség és az alapértelmezett TTL-értékek nem jelennek meg az előző parancs kimenetében, ezeket az értékeket a portálról szerezheti be.

Táblázat módosítása

A következő értékeket az alter table paranccsal módosíthatja:

  • kiosztott átviteli sebesség
  • élettartam-érték
    Az oszlopmódosítások jelenleg nem támogatottak.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Tábla elvetése

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Ellenőrzés a cqlsh-ban

Futtassa a következő parancsot a cqlsh-ban, és látnia kell, hogy a "könyvek" tábla már nem érhető el:

USE books_ks;
DESCRIBE tables;

Következő lépések

A kulcstér és a tábla létrehozása után folytassa a CRUD-műveletekre vonatkozó alábbi cikkekkel és egyebekvel: