Aracılığıyla paylaş


Spark'tan Apache Cassandra için Azure Cosmos DB'ye bağlanma

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Bu makale, Spark'tan Apache Cassandra tümleştirmesi için Azure Cosmos DB ile ilgili bir dizi makaleden biridir. Makalelerde bağlantı, Veri Tanımı Dili (DDL) işlemleri, temel Veri İşleme Dili (DML) işlemleri ve Spark'tan Apache Cassandra için gelişmiş Azure Cosmos DB tümleştirmesi ele alınıyor.

Önkoşullar

Bağlantı bağımlılıkları

  • Cassandra için Spark bağlayıcısı: Spark bağlayıcısı, Apache Cassandra için Azure Cosmos DB'ye bağlanmak için kullanılır. Maven central'da bulunan ve Spark ortamınızın Spark ve Scala sürümleriyle uyumlu bağlayıcı sürümünü belirleyin ve kullanın. Spark 3.2.1 veya üzerini destekleyen bir ortam ve spark bağlayıcısının maven koordinatlarında com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0kullanılabilir olması önerilir. Spark 2.x kullanıyorsanız, spark bağlayıcısı maven koordinatlarında com.datastax.spark:spark-cassandra-connector_2.11:2.4.3kullanılarak Spark sürüm 2.4.5 ile bir ortam önerilir.

  • Cassandra için API için Azure Cosmos DB yardımcı kitaplığı: Spark 2.x sürümü kullanıyorsanız Spark bağlayıcısına ek olarak hız sınırlamasını işlemek için Azure Cosmos DB'den maven koordinatları com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 içeren azure-cosmos-cassandra-spark-helper adlı başka bir kitaplığa ihtiyacınız vardır. Bu kitaplık özel bağlantı fabrikası ve yeniden deneme ilkesi sınıfları içerir.

    Azure Cosmos DB'deki yeniden deneme ilkesi, HTTP durum kodu 429("İstek Oranı Büyük") özel durumlarını işleyecek şekilde yapılandırılmıştır. Apache Cassandra için Azure Cosmos DB, bu özel durumları Cassandra yerel protokolünde aşırı yüklenmiş hatalara çevirir ve geri çevirmeleri yeniden deneyebilirsiniz. Azure Cosmos DB sağlanan aktarım hızı modelini kullandığından, giriş/çıkış oranları arttığında istek hızı sınırlama özel durumları oluşur. Yeniden deneme ilkesi spark işlerinizi kapsayıcınız için ayrılan aktarım hızını kısa süre aşan veri ani artışlarına karşı korur. Spark 3.x bağlayıcısını kullanıyorsanız, bu kitaplığı uygulamak gerekli değildir.

    Not

    Yeniden deneme ilkesi spark işlerinizi yalnızca anlık ani artışlara karşı koruyabilir. İş yükünüzü çalıştırmak için gereken RU'ları yapılandırmadıysanız, yeniden deneme ilkesi geçerli değildir ve yeniden deneme ilkesi sınıfı özel durumu yeniden oluşturur.

  • Azure Cosmos DB hesabı bağlantı ayrıntıları: Cassandra için Azure API'nizin hesap adı, hesap uç noktası ve anahtarı.

Spark bağlayıcısı aktarım hızı yapılandırmasını iyileştirme

Sonraki bölümde Cassandra için Spark Bağlayıcısı'nı kullanarak aktarım hızını denetlemeye yönelik tüm ilgili parametreler listelenmiştir. Spark işlerinde aktarım hızını en üst düzeye çıkarmak üzere parametreleri iyileştirmek için, spark.cassandra.output.concurrent.writesspark.cassandra.concurrent.readsçok fazla azaltma ve spark.cassandra.input.reads_per_sec geri alma (bu da daha düşük aktarım hızına neden olabilir) önlemek için , ve yapılandırmalarının doğru yapılandırılması gerekir.

Bu yapılandırmaların en uygun değeri dört faktöre bağlıdır:

  • Verilerin alınmakta olduğu tablo için yapılandırılan aktarım hızı (İstek Birimleri) miktarı.
  • Spark kümenizdeki çalışan sayısı.
  • Spark işiniz için yapılandırılan yürütücü sayısı (Spark sürümü kullanılarak spark.cassandra.connection.connections_per_executor_max veya spark.cassandra.connection.remoteConnectionsPerExecutor buna bağlı olarak denetlenebilir)
  • Aynı Veri Merkezi'nde birlikte konumlandırıldıysanız Azure Cosmos DB'ye yapılan her isteğin ortalama gecikme süresi. Bu değerin yazmalar için 10 ms ve okumalar için 3 ms olduğunu varsayalım.

Örneğin, beş çalışanımız ve = 1 değerimiz spark.cassandra.output.concurrent.writesve = 1 değerimiz spark.cassandra.connection.remoteConnectionsPerExecutor varsa, her biri bir iş parçacığıyla aynı anda tabloya yazan beş çalışan var demektir. Tek bir yazma işlemi 10 ms sürerse, iş parçacığı başına saniyede 100 istek (1000 milisaniye 10'a bölündü) gönderebiliriz. Beş işçiyle bu, saniyede 500 yazma olur. Yazma başına ortalama beş istek birimi (RU) maliyetinde, hedef tablo için sağlanan en az 2500 istek birimi (saniyede 5 RU x 500 yazma) gerekir.

Yürütücü sayısını artırmak, belirli bir işteki iş parçacığı sayısını artırabilir ve bu da aktarım hızını artırabilir. Ancak, bunun tam etkisi işe bağlı olarak değişken olabilir, ancak çalışan sayısıyla aktarım hızını denetlemek daha belirleyicidir. İstek Birimi (RU) ücretini almak için profil oluşturarak belirli bir isteğin tam maliyetini de belirleyebilirsiniz. Bu, tablonuz veya anahtar alanınız için aktarım hızı sağlarken daha doğru olmanıza yardımcı olur. İstek düzeyine göre istek birimi ücretlerinin nasıl alınıyor olduğunu anlamak için buradaki makalemize göz atın.

Veritabanında aktarım hızını ölçeklendirme

Cassandra Spark bağlayıcısı, Azure Cosmos DB'deki aktarım hızını verimli bir şekilde doygunluğa sunar. Sonuç olarak, etkili yeniden denemelerde bile, ilgili hataların hız sınırlamasını önlemek için tabloda veya anahtar alanı düzeyinde yeterli aktarım hızı (RU) sağlandığından emin olmanız gerekir. Belirli bir tablo veya anahtar alanında en az 400 RU ayarı yeterli olmayacaktır. Spark bağlayıcısı, en düşük aktarım hızı yapılandırma ayarlarında bile yaklaşık 6000 istek birimine veya daha fazlasına karşılık gelen bir hızda yazabilir.

Spark kullanarak veri taşıma için gereken RU ayarı kararlı durum iş yükünüz için gerekenden daha yüksekse, belirli bir süre için iş yükünüzün gereksinimlerini karşılamak için Azure Cosmos DB'de aktarım hızını sistematik olarak artırıp azaltabilirsiniz. Program aracılığıyla ve dinamik olarak ölçeklendirmeye yönelik farklı seçenekleri anlamak için Cassandra için API'de esnek ölçeklendirme makalemizi okuyun.

Not

Yukarıdaki kılavuzda verilerin makul bir şekilde eşit bir şekilde dağıtılması varsayılır. Verilerde önemli bir dengesizlik varsa (yani, aynı bölüm anahtarı değerine göre çok fazla sayıda okuma/yazma işlemi varsa), tablonuzda çok sayıda istek birimi sağlanmış olsa bile yine de performans sorunlarıyla karşılaşabilirsiniz. İstek birimleri fiziksel bölümler arasında eşit olarak bölünür ve ağır veri dengesizliği tek bir bölüme yönelik isteklerde performans sorununa neden olabilir.

Spark bağlayıcısı aktarım hızı yapılandırma parametreleri

Aşağıdaki tabloda, bağlayıcı tarafından sağlanan Apache Cassandra'ya özgü aktarım hızı yapılandırma parametreleri için Azure Cosmos DB listelenmektedir. Tüm yapılandırma parametrelerinin ayrıntılı listesi için Spark Cassandra Bağlayıcısı GitHub deposunun yapılandırma başvuru sayfasına bakın.

Özellik Adı Varsayılan değer Açıklama
spark.cassandra.output.batch.size.rows 1 Tek bir toplu iş başına satır sayısı. Bu parametreyi 1 olarak ayarlayın. Bu parametre, ağır iş yükleri için daha yüksek aktarım hızı elde etmek için kullanılır.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Hiçbiri Yürütücü başına düğüm başına en fazla bağlantı sayısı. 10*n, n düğümlü Cassandra kümesindeki düğüm başına 10 bağlantıya eşdeğerdir. Bu nedenle, beş düğüm cassandra kümesi için yürütücü başına düğüm başına beş bağlantı gerekiyorsa, bu yapılandırmayı 25 olarak ayarlamanız gerekir. Bu değeri paralellik derecesine veya Spark işlerinizin yapılandırıldığı yürütücü sayısına göre değiştirin.
spark.cassandra.output.concurrent.writes 100 Yürütücü başına gerçekleşebilecek paralel yazma sayısını tanımlar. "batch.size.rows" değerini 1 olarak ayarladığınızdan, bu değeri uygun şekilde ölçeklendirdiğinizden emin olun. Bu değeri, iş yükünüz için elde etmek istediğiniz paralellik derecesine veya aktarım hızına göre değiştirin.
spark.cassandra.concurrent.reads 512 Yürütücü başına gerçekleşebilecek paralel okuma sayısını tanımlar. Bu değeri, iş yükünüz için elde etmek istediğiniz paralellik derecesine veya aktarım hızına göre değiştirin
spark.cassandra.output.throughput_mb_per_sec Hiçbiri Yürütücü başına toplam yazma aktarım hızını tanımlar. Bu parametre Spark iş aktarım hızınız için üst sınır olarak kullanılabilir ve Bunu Azure Cosmos DB kapsayıcınızın sağlanan aktarım hızına göre temel alabilir.
spark.cassandra.input.reads_per_sec Hiçbiri Yürütücü başına toplam okuma aktarım hızını tanımlar. Bu parametre Spark iş aktarım hızınız için üst sınır olarak kullanılabilir ve Bunu Azure Cosmos DB kapsayıcınızın sağlanan aktarım hızına göre temel alabilir.
spark.cassandra.output.batch.grouping.buffer.size 1000 Cassandra için API'ye göndermeden önce bellekte depolanabilen tek spark görevi başına toplu iş sayısını tanımlar
spark.cassandra.connection.keep_alive_ms 60000 Kullanılmayan bağlantıların kullanılabilmesi için gereken süreyi tanımlar.

Spark işleriniz için beklediğiniz iş yüküne ve Azure Cosmos DB hesabınız için sağladığınız aktarım hızına göre bu parametrelerin aktarım hızını ve paralellik derecesini ayarlayın.

Spark'tan Apache Cassandra için Azure Cosmos DB'ye bağlanma

cqlsh

Aşağıdaki komutlar, cqlsh'den Apache Cassandra için Azure Cosmos DB'ye bağlanma işleminin ayrıntılarını içerir. Bu, Spark'taki örneklerin üzerinden geçerken doğrulama için kullanışlıdır.
Linux/Unix/Mac'ten:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Aşağıdaki makalede Azure Databricks kümesi sağlama, Apache Cassandra için Azure Cosmos DB'ye bağlanmak için küme yapılandırması ve DDL işlemlerini, DML işlemlerini ve daha fazlasını kapsayan çeşitli örnek not defterleri ele alınıyor.
Azure Databricks'ten Apache Cassandra için Azure Cosmos DB ile çalışma

2. Azure HDInsight-Spark

Aşağıdaki makalede HDinsight-Spark hizmeti, sağlama, Apache Cassandra için Azure Cosmos DB'ye bağlanmak için küme yapılandırması ve DDL işlemleri, DML işlemleri ve daha fazlasını kapsayan çeşitli örnek not defterleri ele alınıyor.
Azure HDInsight-Spark'tan Apache Cassandra için Azure Cosmos DB ile çalışma

3. Genel olarak Spark ortamı

Yukarıdaki bölümler Azure Spark tabanlı PaaS hizmetlerine özgü olsa da, bu bölüm tüm genel Spark ortamlarını kapsar. Bağlayıcı bağımlılıkları, içeri aktarmalar ve Spark oturumu yapılandırması aşağıda ayrıntılı olarak açıklandı. "Sonraki adımlar" bölümünde DDL işlemleri, DML işlemleri ve daha fazlası için kod örnekleri ele alınıyor.

Bağlayıcı bağımlılıkları:

  1. Spark için Cassandra bağlayıcısını almak için maven koordinatlarını ekleyin
  2. Cassandra için API için Azure Cosmos DB yardımcı kitaplığının maven koordinatlarını ekleme

Ithalat:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Spark oturumu yapılandırması:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Sonraki adımlar

Aşağıdaki makalelerde Apache Cassandra için Azure Cosmos DB ile Spark tümleştirmesi gösterilmektedir.