Sdílet prostřednictvím


Operace DDL ve službě Azure Cosmos DB pro Apache Cassandra ze Sparku

PLATÍ PRO: Cassandra

Tento článek podrobně popisuje operace keyspace a DDL tabulky se službou Azure Cosmos DB for Apache Cassandra ze Sparku.

Kontext Sparku

Konektor pro rozhraní API pro Cassandra vyžaduje, aby se podrobnosti o připojení Cassandra inicializovaly jako součást kontextu Sparku. Při spuštění poznámkového bloku se kontext Sparku už inicializuje a nedoporučuje se ho zastavit a znovu inicializovat. Jedním z řešení je přidání rozhraní API pro konfiguraci instance Cassandra na úrovni clusteru v konfiguraci Sparku clusteru. Jedná se o jednorázovou aktivitu na cluster. Do konfigurace Sparku přidejte následující kód jako dvojici hodnot klíčů oddělených mezerami:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Poznámka:

Pokud používáte Spark 3.x, nemusíte instalovat pomocné rutiny a objekt pro vytváření připojení služby Azure Cosmos DB. Místo konektoru Spark 3 byste také měli použít remoteConnectionsPerExecutorconnections_per_executor_max (viz výše).

Upozorňující

Ukázky Sparku 3 uvedené v tomto článku byly testovány se Sparkem verze 3.2.1 a odpovídajícím konektorem Cassandra Spark com.datastax.spark :spark-cassandra-connector-assembly_2.12:3.2.1. Novější verze Sparku nebo konektoru Cassandra nemusí fungovat podle očekávání.

Operace DDL služby Keyspace

Vytvoření prostoru klíčů

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Ověření v cqlsh

V cqlsh spusťte následující příkaz a měli byste vidět prostor klíčů, který jste vytvořili dříve.

DESCRIBE keyspaces;

Vyřazení prostoru klíčů

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Ověření v cqlsh

DESCRIBE keyspaces;

Operace DDL tabulky

Aspekty:

  • Propustnost lze přiřadit na úrovni tabulky pomocí příkazu create table.
  • Jeden klíč oddílu může ukládat 20 GB dat.
  • Jeden záznam může obsahovat maximálně 2 MB dat.
  • Jeden rozsah klíčů oddílu může ukládat více klíčů oddílu.

Vytvoření tabulky

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Ověření v cqlsh

V cqlsh spusťte následující příkaz a měli byste vidět tabulku s názvem books:

USE books_ks;
DESCRIBE books;

Zřízená propustnost a výchozí hodnoty TTL se ve výstupu předchozího příkazu nezobrazují. Tyto hodnoty můžete získat z portálu.

Alter table

Následující hodnoty můžete změnit pomocí příkazu alter table:

  • zřízená propustnost
  • Hodnota time-to-live
    Změny sloupců se v současné době nepodporují.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Drop table

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Ověření v cqlsh

V cqlsh spusťte následující příkaz a měli byste vidět, že tabulka "books" už není dostupná:

USE books_ks;
DESCRIBE tables;

Další kroky

Po vytvoření prostoru klíčů a tabulky přejděte k následujícím článkům pro operace CRUD a další: