DDL-műveletek az Apache Cassandra azure Cosmos DB-ben a Sparkból
A KÖVETKEZŐKRE VONATKOZIK: Cassandra
Ez a cikk az Apache Cassandra Azure Cosmos DB-hez készült Kulcstér- és tábla DDL-műveleteit ismerteti a Sparkból.
Spark-környezet
A Cassandra API-hoz készült összekötő megköveteli, hogy a Cassandra-kapcsolat részleteit inicializálni kell a Spark-környezet részeként. Amikor elindít egy jegyzetfüzetet, a Spark-környezet már inicializálva van, és nem ajánlott leállítani és újrainicializálni. Az egyik megoldás a Cassandra-példányok api-konfigurációjának fürtszintű hozzáadása a fürt spark-konfigurációjában. Fürtenként egyszeri tevékenység. Adja hozzá a következő kódot a Spark-konfigurációhoz szóközzel elválasztott kulcsértékpárként:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Api a Cassandra-hoz kapcsolódó konfigurációhoz
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Megjegyzés
Ha Spark 3.x-et használ, nem kell telepítenie az Azure Cosmos DB segítőt és kapcsolat-előállítót. A Spark 3-összekötő helyett connections_per_executor_max
is érdemes használnia remoteConnectionsPerExecutor
(lásd fent).
Figyelmeztetés
A cikkben bemutatott Spark 3-mintákat a Spark 3.2.1-es verziójával és a megfelelő Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1-es verziójával teszteltük. Előfordulhat, hogy a Spark és/vagy a Cassandra-összekötő későbbi verziói nem a várt módon működnek.
Keyspace DDL-műveletek
Kulcstér létrehozása
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Ellenőrzés a cqlsh-ban
Futtassa a következő parancsot a cqlsh-ban, és látnia kell a korábban létrehozott kulcsteret.
DESCRIBE keyspaces;
Kulcstér elvetése
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Ellenőrzés a cqlsh-ban
DESCRIBE keyspaces;
Tábla DDL-műveletei
Megfontolások:
- Az átviteli sebesség a tábla szintjén rendelhető hozzá a tábla létrehozása utasítással.
- Egy partíciókulcs 20 GB adatot tárolhat.
- Egy rekord legfeljebb 2 MB adatot tárolhat.
- Egy partíciókulcs-tartomány több partíciókulcsot is tárolhat.
Tábla létrehozása
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Ellenőrzés a cqlsh-ban
Futtassa a következő parancsot a cqlsh-ban, és a "books" nevű táblának kell megjelennie:
USE books_ks;
DESCRIBE books;
A kiosztott átviteli sebesség és az alapértelmezett TTL-értékek nem jelennek meg az előző parancs kimenetében, ezeket az értékeket a portálról szerezheti be.
Táblázat módosítása
A következő értékeket az alter table paranccsal módosíthatja:
- kiosztott átviteli sebesség
- élettartam-érték
Az oszlopmódosítások jelenleg nem támogatottak.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Tábla elvetése
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Ellenőrzés a cqlsh-ban
Futtassa a következő parancsot a cqlsh-ban, és látnia kell, hogy a "könyvek" tábla már nem érhető el:
USE books_ks;
DESCRIBE tables;
Következő lépések
A kulcstér és a tábla létrehozása után folytassa a CRUD-műveletekre vonatkozó alábbi cikkekkel és egyebekvel: