Aracılığıyla paylaş


Spark'tan Apache Cassandra için Azure Cosmos DB'deki DDL işlemleri

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Bu makalede Spark'tan Apache Cassandra için Azure Cosmos DB'ye yönelik anahtar alanı ve tablo DDL işlemleri ayrıntılı olarak açıklanmaktadır.

Spark bağlamı

Cassandra için API bağlayıcısı, Spark bağlamının bir parçası olarak Cassandra bağlantı ayrıntılarının başlatılmasını gerektirir. Bir not defterini başlattığınızda spark bağlamı zaten başlatılır ve durdurmanız ve yeniden başlatmanız önerilmez. Bir çözüm, Cassandra örneği yapılandırmasını küme düzeyinde küme spark yapılandırmasına eklemektir. Küme başına tek seferlik bir etkinliktir. Spark yapılandırmasına boşlukla ayrılmış anahtar değer çifti olarak aşağıdaki kodu ekleyin:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Not

Spark 3.x kullanıyorsanız Azure Cosmos DB yardımcısını ve bağlantı fabrikasını yüklemeniz gerekmez. Spark 3 bağlayıcısı connections_per_executor_max için yerine de kullanmanız remoteConnectionsPerExecutor gerekir (yukarıya bakın).

Uyarı

Bu makalede gösterilen Spark 3 örnekleri Spark sürüm 3.2.1 ve buna karşılık gelen Cassandra Spark Bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 ile test edilmiştir. Spark ve/veya Cassandra bağlayıcısının sonraki sürümleri beklendiği gibi çalışmayabilir.

Keyspace DDL işlemleri

Anahtar alanı oluşturma

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

cqlsh'de doğrulama

cqlsh'de aşağıdaki komutu çalıştırın; daha önce oluşturduğunuz anahtar alanı görmeniz gerekir.

DESCRIBE keyspaces;

Anahtar alanı bırakma

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

cqlsh'de doğrulama

DESCRIBE keyspaces;

Tablo DDL işlemleri

Önemli noktalar:

  • Aktarım hızı, create table deyimi kullanılarak tablo düzeyinde atanabilir.
  • Bir bölüm anahtarı 20 GB veri depolayabilir.
  • Bir kayıt en fazla 2 MB veri depolayabilir.
  • Bir bölüm anahtarı aralığı birden çok bölüm anahtarını depolayabilir.

Tablo oluştur

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

cqlsh'de doğrulama

cqlsh'de aşağıdaki komutu çalıştırdığınızda "books:

USE books_ks;
DESCRIBE books;

Sağlanan aktarım hızı ve varsayılan TTL değerleri önceki komutun çıkışında gösterilmez, bu değerleri portaldan alabilirsiniz.

Tabloyu değiştirme

Alter table komutunu kullanarak aşağıdaki değerleri değiştirebilirsiniz:

  • sağlanan aktarım hızı
  • yaşam süresi değeri
    Sütun değişiklikleri şu anda desteklenmiyor.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Tabloyu bırakma

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cqlsh'de doğrulama

cqlsh'de aşağıdaki komutu çalıştırın ve "books" tablosunun artık kullanılamadığını görmeniz gerekir:

USE books_ks;
DESCRIBE tables;

Sonraki adımlar

Anahtar alanını ve tabloyu oluşturduktan sonra CRUD işlemleri ve daha fazlası için aşağıdaki makalelere geçin: