Dela via


DDL-åtgärder i Azure Cosmos DB för Apache Cassandra från Spark

GÄLLER FÖR: Kassandra

Den här artikeln beskriver nyckelrymds- och tabell-DDL-åtgärder mot Azure Cosmos DB för Apache Cassandra från Spark.

Spark-kontext

Anslutningsappen för API för Cassandra kräver att Cassandra-anslutningsinformationen initieras som en del av Spark-kontexten. När du startar en notebook-fil initieras spark-kontexten redan och det är inte lämpligt att stoppa och initiera den igen. En lösning är att lägga till API:et för Cassandra-instanskonfigurationen på klusternivå i kluster spark-konfigurationen. Det är engångsaktivitet per kluster. Lägg till följande kod i Spark-konfigurationen som ett blankstegsavgränsat nyckelvärdepar:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Kommentar

Om du använder Spark 3.x behöver du inte installera Azure Cosmos DB-hjälpen och anslutningsfabriken. Du bör också använda remoteConnectionsPerExecutor i stället connections_per_executor_max för för Spark 3-anslutningsappen (se ovan).

Varning

Spark 3-exemplen som visas i den här artikeln har testats med Spark version 3.2.1 och motsvarande Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Senare versioner av Spark och/eller Cassandra-anslutningsappen kanske inte fungerar som förväntat.

DDL-åtgärder för nyckelområde

Skapa ett nyckelområde

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Verifiera i cqlsh

Kör följande kommando i cqlsh så bör du se nyckelområdet som du skapade tidigare.

DESCRIBE keyspaces;

Ta bort ett nyckelområde

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Verifiera i cqlsh

DESCRIBE keyspaces;

Tabell-DDL-åtgärder

Överväganden:

  • Dataflödet kan tilldelas på tabellnivå med hjälp av instruktionen skapa tabell.
  • En partitionsnyckel kan lagra 20 GB data.
  • En post kan lagra högst 2 MB data.
  • Ett partitionsnyckelintervall kan lagra flera partitionsnycklar.

Skapa en tabell

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Verifiera i cqlsh

Kör följande kommando i cqlsh och du bör se tabellen med namnet "books:

USE books_ks;
DESCRIBE books;

Etablerat dataflöde och standardvärden för TTL visas inte i utdata från föregående kommando. Du kan hämta dessa värden från portalen.

Ändra tabell

Du kan ändra följande värden med hjälp av kommandot alter table:

  • etablerat dataflöde
  • time-to-live-värde
    Kolumnändringar stöds för närvarande inte.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Ta bort tabell

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Verifiera i cqlsh

Kör följande kommando i cqlsh och du bör se att tabellen "böcker" inte längre är tillgänglig:

USE books_ks;
DESCRIBE tables;

Nästa steg

När du har skapat nyckelområdet och tabellen fortsätter du till följande artiklar för CRUD-åtgärder med mera: