Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
GILT FÜR: Cassandra
In diesem Artikel werden DDL-Vorgänge im Keyspace und in Tabellen für Azure Cosmos DB for Apache Cassandra von Spark beschrieben.
Spark-Kontext
Der Connector für die API für Cassandra erfordert, dass die Details der Cassandra-Verbindung als Teil des Spark-Kontexts initialisiert werden. Beim Starten eines Notebooks ist der Spark-Kontext bereits initialisiert, und es ist nicht ratsam, ihn zu beenden und erneut zu initialisieren. Eine Lösung ist, die Konfiguration der API-für-Cassandra-Instanz auf Clusterebene hinzuzufügen, in der Cluster-Spark-Konfiguration. Dies ist eine einmalige Aktivität pro Cluster. Fügen Sie der Spark-Konfiguration den folgenden Code als ein durch Leerzeichen getrenntes Schlüssel-Wert-Paar hinzu:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Konfiguration im Zusammenhang mit der API für Cassandra
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Hinweis
Wenn Sie Spark 3 verwenden, müssen Sie die Hilfs- und Verbindungsfactory von Azure Cosmos DB nicht installieren. Sie sollten auch remoteConnectionsPerExecutor
anstelle von connections_per_executor_max
für den Spark 3-Connector verwenden (siehe oben).
Warnung
Die in diesem Artikel gezeigten Spark 3-Beispiele wurden mit Spark Version 3.2.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.
Keyspace-DDL-Vorgänge
Erstellen eines Keyspace
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Überprüfen in cqlsh
Führen Sie den folgenden Befehl in cqlsh aus. Daraufhin sollte der Keyspace, den Sie zuvor erstellt haben, angezeigt werden.
DESCRIBE keyspaces;
Verwerfen eines Keyspace
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Überprüfen in cqlsh
DESCRIBE keyspaces;
Tabellen-DDL-Vorgänge
Überlegungen:
- Durchsatz kann mithilfe der Anweisung „create table“ auf Tabellenebene zugewiesen werden.
- In einem Partitionsschlüssel können 20 GB Daten gespeichert werden.
- In einem Datensatz können bis zu 2 MB Daten gespeichert werden.
- In einem Partitionsschlüsselbereich können mehrere Partitionsschlüssel gespeichert werden.
Erstellen einer Tabelle
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Überprüfen in cqlsh
Führen Sie den folgenden Befehl in cqlsh aus. Daraufhin sollte die Tabelle „books“ angezeigt werden:
USE books_ks;
DESCRIBE books;
Der bereitgestellte Durchsatz und die Standard-TTL-Werte werden in der Ausgabe des vorherigen Befehls nicht angezeigt. Sie können diese Werte im Portal abrufen.
ALTER TABLE
Sie können die folgenden Werte mit dem Befehl ALTER TABLE ändern:
- Bereitgestellter Durchsatz
- Wert der Gültigkeitsdauer
Spaltenänderungen werden derzeit nicht unterstützt.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Löschen einer Tabelle
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Überprüfen in cqlsh
Führen Sie den folgenden Befehl in cqlsh aus. Sie sollten sehen, dass die Tabelle „books“ nicht mehr verfügbar ist:
USE books_ks;
DESCRIBE tables;
Nächste Schritte
Fahren Sie nach dem Erstellen von Keyspace und Tabelle mit den folgenden Artikeln für CRUD-Vorgänge und Ähnliches fort: