Spark'tan Apache Cassandra için Azure Cosmos DB'deki DDL işlemleri
ŞUNLAR IÇIN GEÇERLIDIR: Cassandra
Bu makalede Spark'tan Apache Cassandra için Azure Cosmos DB'ye yönelik anahtar alanı ve tablo DDL işlemleri ayrıntılı olarak açıklanmaktadır.
Spark bağlamı
Cassandra için API bağlayıcısı, Spark bağlamının bir parçası olarak Cassandra bağlantı ayrıntılarının başlatılmasını gerektirir. Bir not defterini başlattığınızda spark bağlamı zaten başlatılır ve durdurmanız ve yeniden başlatmanız önerilmez. Bir çözüm, Cassandra örneği yapılandırmasını küme düzeyinde küme spark yapılandırmasına eklemektir. Küme başına tek seferlik bir etkinliktir. Spark yapılandırmasına boşlukla ayrılmış anahtar değer çifti olarak aşağıdaki kodu ekleyin:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Cassandra ile ilgili yapılandırma için API
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Not
Spark 3.x kullanıyorsanız Azure Cosmos DB yardımcısını ve bağlantı fabrikasını yüklemeniz gerekmez. Spark 3 bağlayıcısı connections_per_executor_max
için yerine de kullanmanız remoteConnectionsPerExecutor
gerekir (yukarıya bakın).
Uyarı
Bu makalede gösterilen Spark 3 örnekleri Spark sürüm 3.2.1 ve buna karşılık gelen Cassandra Spark Bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 ile test edilmiştir. Spark ve/veya Cassandra bağlayıcısının sonraki sürümleri beklendiği gibi çalışmayabilir.
Keyspace DDL işlemleri
Anahtar alanı oluşturma
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
cqlsh'de doğrulama
cqlsh'de aşağıdaki komutu çalıştırın; daha önce oluşturduğunuz anahtar alanı görmeniz gerekir.
DESCRIBE keyspaces;
Anahtar alanı bırakma
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
cqlsh'de doğrulama
DESCRIBE keyspaces;
Tablo DDL işlemleri
Önemli noktalar:
- Aktarım hızı, create table deyimi kullanılarak tablo düzeyinde atanabilir.
- Bir bölüm anahtarı 20 GB veri depolayabilir.
- Bir kayıt en fazla 2 MB veri depolayabilir.
- Bir bölüm anahtarı aralığı birden çok bölüm anahtarını depolayabilir.
Tablo oluştur
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
cqlsh'de doğrulama
cqlsh'de aşağıdaki komutu çalıştırdığınızda "books:
USE books_ks;
DESCRIBE books;
Sağlanan aktarım hızı ve varsayılan TTL değerleri önceki komutun çıkışında gösterilmez, bu değerleri portaldan alabilirsiniz.
Tabloyu değiştirme
Alter table komutunu kullanarak aşağıdaki değerleri değiştirebilirsiniz:
- sağlanan aktarım hızı
- yaşam süresi değeri
Sütun değişiklikleri şu anda desteklenmiyor.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Tabloyu bırakma
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cqlsh'de doğrulama
cqlsh'de aşağıdaki komutu çalıştırın ve "books" tablosunun artık kullanılamadığını görmeniz gerekir:
USE books_ks;
DESCRIBE tables;
Sonraki adımlar
Anahtar alanını ve tabloyu oluşturduktan sonra CRUD işlemleri ve daha fazlası için aşağıdaki makalelere geçin: