Delen via


DDL-bewerkingen in Azure Cosmos DB voor Apache Cassandra vanuit Spark

VAN TOEPASSING OP: Cassandra

In dit artikel worden keyspace- en tabel-DDL-bewerkingen beschreven voor Azure Cosmos DB voor Apache Cassandra vanuit Spark.

Spark-context

Voor de connector voor API voor Cassandra moeten de Cassandra-verbindingsgegevens worden geïnitialiseerd als onderdeel van de Spark-context. Wanneer u een notebook start, wordt de Spark-context al geïnitialiseerd en is het niet raadzaam deze te stoppen en opnieuw te initialiseren. Een oplossing is het toevoegen van de API voor cassandra-exemplaarconfiguratie op clusterniveau, in de spark-configuratie van het cluster. Dit is eenmalige activiteit per cluster. Voeg de volgende code toe aan de Spark-configuratie als een door spaties gescheiden sleutelwaardepaar:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Notitie

Als u Spark 3.x gebruikt, hoeft u de Azure Cosmos DB-helper en verbindingsfactory niet te installeren. U moet ook gebruiken remoteConnectionsPerExecutor in plaats van connections_per_executor_max voor de Spark 3-connector (zie hierboven).

Waarschuwing

De Spark 3-voorbeelden die in dit artikel worden weergegeven, zijn getest met Spark-versie 3.2.1 en de bijbehorende Cassandra Spark-connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Latere versies van Spark en/of de Cassandra-connector werken mogelijk niet zoals verwacht.

Keyspace DDL-bewerkingen

Een keyspace maken

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Valideren in cqlsh

Voer de volgende opdracht uit in cqlsh en u ziet nu de keyspace die u eerder hebt gemaakt.

DESCRIBE keyspaces;

Een keyspace verwijderen

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Valideren in cqlsh

DESCRIBE keyspaces;

Tabel-DDL-bewerkingen

Overwegingen:

  • Doorvoer kan worden toegewezen op tabelniveau met behulp van de instructie tabel maken.
  • Eén partitiesleutel kan 20 GB aan gegevens opslaan.
  • Eén record kan maximaal 2 MB aan gegevens opslaan.
  • Eén partitiesleutelbereik kan meerdere partitiesleutels opslaan.

Een tabel maken

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Valideren in cqlsh

Voer de volgende opdracht uit in cqlsh en u ziet de tabel met de naam boeken:

USE books_ks;
DESCRIBE books;

Ingerichte doorvoer en standaard TTL-waarden worden niet weergegeven in de uitvoer van de vorige opdracht. U kunt deze waarden ophalen via de portal.

Tabel wijzigen

U kunt de volgende waarden wijzigen met behulp van de opdracht Tabel wijzigen:

  • ingerichte doorvoer
  • time-to-live-waarde
    Kolomwijzigingen worden momenteel niet ondersteund.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Tabel neerzetten

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Valideren in cqlsh

Voer de volgende opdracht uit in cqlsh en u ziet dat de tabel Boeken niet meer beschikbaar is:

USE books_ks;
DESCRIBE tables;

Volgende stappen

Nadat u de keyspace en de tabel hebt gemaakt, gaat u verder met de volgende artikelen voor CRUD-bewerkingen en meer: