Operace DDL ve službě Azure Cosmos DB pro Apache Cassandra ze Sparku
PLATÍ PRO: Cassandra
Tento článek podrobně popisuje operace keyspace a DDL tabulky se službou Azure Cosmos DB for Apache Cassandra ze Sparku.
Kontext Sparku
Konektor pro rozhraní API pro Cassandra vyžaduje, aby se podrobnosti o připojení Cassandra inicializovaly jako součást kontextu Sparku. Při spuštění poznámkového bloku se kontext Sparku už inicializuje a nedoporučuje se ho zastavit a znovu inicializovat. Jedním z řešení je přidání rozhraní API pro konfiguraci instance Cassandra na úrovni clusteru v konfiguraci Sparku clusteru. Jedná se o jednorázovou aktivitu na cluster. Do konfigurace Sparku přidejte následující kód jako dvojici hodnot klíčů oddělených mezerami:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Rozhraní API pro konfiguraci související s Cassandrou
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Poznámka:
Pokud používáte Spark 3.x, nemusíte instalovat pomocné rutiny a objekt pro vytváření připojení služby Azure Cosmos DB. Místo konektoru Spark 3 byste také měli použít remoteConnectionsPerExecutor
connections_per_executor_max
(viz výše).
Upozorňující
Ukázky Sparku 3 uvedené v tomto článku byly testovány se Sparkem verze 3.2.1 a odpovídajícím konektorem Cassandra Spark com.datastax.spark :spark-cassandra-connector-assembly_2.12:3.2.1. Novější verze Sparku nebo konektoru Cassandra nemusí fungovat podle očekávání.
Operace DDL služby Keyspace
Vytvoření prostoru klíčů
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Ověření v cqlsh
V cqlsh spusťte následující příkaz a měli byste vidět prostor klíčů, který jste vytvořili dříve.
DESCRIBE keyspaces;
Vyřazení prostoru klíčů
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Ověření v cqlsh
DESCRIBE keyspaces;
Operace DDL tabulky
Aspekty:
- Propustnost lze přiřadit na úrovni tabulky pomocí příkazu create table.
- Jeden klíč oddílu může ukládat 20 GB dat.
- Jeden záznam může obsahovat maximálně 2 MB dat.
- Jeden rozsah klíčů oddílu může ukládat více klíčů oddílu.
Vytvoření tabulky
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Ověření v cqlsh
V cqlsh spusťte následující příkaz a měli byste vidět tabulku s názvem books:
USE books_ks;
DESCRIBE books;
Zřízená propustnost a výchozí hodnoty TTL se ve výstupu předchozího příkazu nezobrazují. Tyto hodnoty můžete získat z portálu.
Alter table
Následující hodnoty můžete změnit pomocí příkazu alter table:
- zřízená propustnost
- Hodnota time-to-live
Změny sloupců se v současné době nepodporují.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Drop table
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Ověření v cqlsh
V cqlsh spusťte následující příkaz a měli byste vidět, že tabulka "books" už není dostupná:
USE books_ks;
DESCRIBE tables;
Další kroky
Po vytvoření prostoru klíčů a tabulky přejděte k následujícím článkům pro operace CRUD a další:
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro