Menyambungkan ke Azure Cosmos DB untuk Apache Cassandra dari Spark
BERLAKU UNTUK: Cassandra
Artikel ini adalah salah satu dari serangkaian artikel tentang integrasi Azure Cosmos DB untuk Apache Cassandra dari Spark. Artikel ini mencakup konektivitas, operasi Bahasa Definisi Data (DDL), operasi Bahasa Manipulasi Data (DML) dasar, dan integrasi Azure Cosmos DB untuk Apache Cassandra tingkat lanjut dari Spark.
Prasyarat
Provisikan lingkungan Spark pilihan Anda[Azure Databricks | Azure HDInsight-Spark | Lainnya].
Dependensi untuk konektivitas
Konektor Spark untuk Cassandra: Konektor Spark digunakan untuk terhubung ke Azure Cosmos DB untuk Apache Cassandra. Identifikasi dan gunakan versi konektor yang terletak di pusat Maven yang kompatibel dengan versi Spark dan Scala dari lingkungan Spark Anda. Sebaiknya lingkungan yang mendukung Spark 3.2.1 atau yang lebih tinggi, dan konektor spark yang tersedia di koordinat maven
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
. Jika menggunakan Spark 2.x, kami merekomendasikan lingkungan dengan Spark versi 2.4.5, menggunakan konektor spark pada koordinat mavencom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
.Pustaka pembantu Azure Cosmos DB untuk API untuk Cassandra: Jika Anda menggunakan versi Spark 2.x, maka selain konektor Spark, Anda memerlukan pustaka lain yang disebut azure-cosmos-cassandra-spark-helper dengan koordinat
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
maven dari Azure Cosmos DB untuk menangani pembatasan tarif. Pustaka ini berisi pabrik koneksi kustom dan kelas kebijakan percobaan kembali.Kebijakan percobaan kembali di Microsoft Azure Cosmos DB dikonfigurasi untuk menangani pengecualian kode status HTTP 429("Tingkat Permintaan Besar"). Azure Cosmos DB for Apache Cassandra menerjemahkan pengecualian ini menjadi kesalahan yang kelebihan beban pada protokol asli Cassandra, dan Anda dapat mencoba kembali dengan back-off. Karena Microsoft Azure Cosmos DB menggunakan model throughput yang di provisikan, tingkat permintaan yang membatasi pengecualian terjadi ketika tingkat masuk / keluar meningkat. Kebijakan percobaan kembali melindungi pekerjaan spark Anda terhadap lonjakan data yang sesaat melebihi throughput yang dialokasikan untuk kontainer Anda. Jika menggunakan konektor Spark 3.x, tidak perlu menerapkan pustaka ini.
Catatan
Kebijakan percobaan kembali dapat melindungi pekerjaan spark Anda terhadap lonjakan sesaat saja. Jika Anda belum mengonfigurasi cukup RU yang diperlukan untuk menjalankan beban kerja Anda, maka kebijakan percobaan kembali tidak berlaku dan kelas kebijakan percobaan kembali menemukan kembali pengecualian.
Detail koneksi akun Azure Cosmos DB: Api Azure Anda untuk nama akun Cassandra, titik akhir akun, dan kunci.
Mengoptimalkan konfigurasi throughput konektor Spark
Di bagian berikutnya, tercantum semua parameter yang relevan untuk mengontrol throughput menggunakan Spark Connector untuk Cassandra. Untuk mengoptimalkan parameter guna memaksimalkan throughput untuk pekerjaan spark, konfigurasi spark.cassandra.output.concurrent.writes
, spark.cassandra.concurrent.reads
, dan spark.cassandra.input.reads_per_sec
perlu dikonfigurasi dengan benar, untuk menghindari terlalu banyak pembatasan dan back-off (yang dapat menyebabkan throughput yang lebih rendah).
Nilai optimal konfigurasi ini tergantung pada 4 faktor:
- Jumlah throughput (Unit Permintaan) yang dikonfigurasi untuk tabel tempat data diserap.
- Jumlah pekerja di kluster Spark Anda.
- Jumlah eksekutor yang dikonfigurasi untuk pekerjaan spark Anda (yang dapat dikontrol menggunakan
spark.cassandra.connection.connections_per_executor_max
atauspark.cassandra.connection.remoteConnectionsPerExecutor
tergantung pada versi Spark) - Latensi rata-rata setiap permintaan ke Azure Cosmos DB, jika Anda dikolokasikan di Pusat Data yang sama. Asumsikan nilai ini menjadi 10 md untuk tulis dan 3 md untuk baca.
Misalnya, jika kita memiliki lima pekerja dan nilai spark.cassandra.output.concurrent.writes
= 1, dan nilai spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, maka kita memiliki lima pekerja yang secara bersamaan menulis ke dalam meja, masing-masing dengan satu utas. Jika dibutuhkan 10 md untuk melakukan satu tulis, kita dapat mengirim 100 permintaan (1000 milidetik dibagi 10) per detik, per alur. Dengan lima pekerja, jumlahnya akan menjadi 500 tulis per detik. Dengan biaya rata-rata lima unit permintaan (RU) per tulis, tabel target akan membutuhkan minimal 2500 unit permintaan yang tersedia (5 RU x 500 tulis per detik).
Meningkatkan jumlah eksekutor dapat meningkatkan jumlah alur dalam pekerjaan tertentu, yang dapat meningkatkan throughput. Namun, dampak pasti dari ini dapat bervariasi tergantung pada pekerjaan, sementara mengontrol throughput dengan jumlah pekerja lebih deterministik. Anda juga dapat menentukan biaya pasti dari permintaan tertentu dengan membuat profil untuk mendapatkan biaya Unit Permintaan (RU). Ini akan membantu Anda menjadi lebih akurat saat memprovisikan throughput untuk tabel atau keyspace Anda. Lihat artikel kami di sini untuk memahami cara mendapatkan biaya unit permintaan pada tingkat per permintaan.
Penskalaan throughput dalam database
Konektor Cassandra Spark akan memenuhi throughput di Azure Cosmos DB dengan sangat efisien. Akibatnya, bahkan dengan percobaan ulang yang efektif, Anda perlu memastikan Anda memiliki throughput (RU) yang cukup yang tersedia pada tingkat tabel atau keyspace untuk mencegah kesalahan terkait pembatasan kecepatan. Pengaturan minimum 400 RU dalam tabel atau keyspace tertentu tidak akan cukup. Bahkan pada pengaturan konfigurasi throughput minimum, konektor Spark dapat menulis pada tingkat yang sesuai dengan sekitar 6000 unit permintaan atau lebih.
Jika pengaturan RU yang diperlukan untuk pergerakan data menggunakan Spark lebih tinggi dari apa yang diperlukan untuk beban kerja keadaan stabil Anda, Anda dapat dengan mudah meningkatkan throughput naik dan turun secara sistematis di Azure Cosmos DB untuk memenuhi kebutuhan beban kerja Anda untuk jangka waktu tertentu. Baca artikel kami tentang skala elastis di API untuk Cassandra untuk memahami berbagai opsi untuk penskalaan secara terprogram dan dinamis.
Catatan
Panduan di atas mengasumsikan distribusi data yang cukup seragam. Jika Anda memiliki kecondongan yang signifikan dalam data (yaitu sejumlah besar baca/tulis ke nilai kunci partisi yang sama), Anda mungkin masih mengalami penyempitan, meskipun Anda memiliki sejumlah besar unit permintaan yang diprovisikan dalam tabel Anda. Unit permintaan dibagi rata di antara partisi fisik, dan kecondongan data berat dapat menyebabkan penyempitan permintaan ke satu partisi.
Parameter konfigurasi throughput konektor Spark
Tabel berikut mencantumkan parameter konfigurasi throughput khusus Azure Cosmos DB untuk Apache Cassandra yang disediakan oleh konektor. Untuk daftar rinci semua parameter konfigurasi, lihat halaman referensi konfigurasi repositori Spark Cassandra Connector GitHub.
Nama Properti | Nilai default | Keterangan |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Jumlah baris per batch tunggal. Atur parameter ini ke 1. Parameter ini digunakan untuk mencapai throughput yang lebih tinggi untuk beban kerja yang berat. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | Tidak | Jumlah maksimum koneksi per node per eksekutor. 10*n setara dengan 10 koneksi per node dalam klaster n-node Cassandra. Jadi, jika Anda memerlukan lima koneksi per node per eksekutor untuk lima node kluster Cassandra, maka Anda harus mengatur konfigurasi ini menjadi 25. Ubah nilai ini berdasarkan tingkat paralelisme atau jumlah pelaksana tempat pekerjaan spark Anda dikonfigurasi. |
spark.cassandra.output.concurrent.writes | 100 | Mendefinisikan jumlah tulisan paralel yang dapat terjadi per pelaksana. Karena Anda mengatur "batch.size.rows" ke 1, pastikan untuk menskalakan nilai ini sesuai. Ubah nilai ini berdasarkan tingkat paralelisme atau throughput yang ingin Anda capai untuk beban kerja Anda. |
spark.cassandra.concurrent.reads | 512 | Menentukan jumlah pembacaan paralel yang dapat terjadi per pelaksana. Ubah nilai ini berdasarkan tingkat paralelisme atau throughput yang ingin Anda capai untuk beban kerja Anda |
spark.cassandra.output.throughput_mb_per_sec | Tidak | Menentukan total throughput tulis per pelaksana. Parameter ini dapat digunakan sebagai batas atas untuk throughput pekerjaan spark Anda, dan mendasarkannya pada throughput yang disediakan dari kontainer Azure Cosmos DB Anda. |
spark.cassandra.input.reads_per_sec | Tidak | Menentukan total throughput baca per pelaksana. Parameter ini dapat digunakan sebagai batas atas untuk throughput pekerjaan spark Anda, dan mendasarkannya pada throughput yang disediakan dari kontainer Azure Cosmos DB Anda. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Menentukan jumlah batch per tugas spark tunggal yang dapat disimpan dalam memori sebelum mengirim ke API untuk Cassandra |
spark.cassandra.connection.keep_alive_ms | 60000 | Menentukan periode waktu hingga koneksi yang tidak digunakan tersedia. |
Sesuaikan throughput dan tingkat paralelisme parameter ini berdasarkan beban kerja yang Anda harapkan untuk pekerjaan spark Anda, dan throughput yang telah Anda provisikan untuk akun Azure Cosmos DB Anda.
Menyambungkan ke Azure Cosmos DB untuk Apache Cassandra dari Spark
cqlsh
Perintah berikut merinci cara menyambungkan ke Azure Cosmos DB untuk Apache Cassandra dari cqlsh. Ini berguna untuk validasi saat Anda menjalankan sampel di Spark.
Dari Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
Artikel di bawah ini membahas provisi kluster Azure Databricks, konfigurasi kluster untuk menyambungkan ke Azure Cosmos DB for Apache Cassandra, dan beberapa contoh notebook yang mencakup operasi DDL, operasi DML, dan banyak lagi.
Bekerja dengan Azure Cosmos DB untuk Apache Cassandra dari Azure Databricks
2. Azure HDInsight-Spark
Artikel di bawah ini membahas layanan HDinsight-Spark, provisi, konfigurasi kluster untuk menyambungkan ke Azure Cosmos DB untuk Apache Cassandra, dan beberapa contoh notebook yang mencakup operasi DDL, operasi DML, dan banyak lagi.
Bekerja dengan Azure Cosmos DB untuk Apache Cassandra dari Azure HDInsight-Spark
3. Lingkungan Spark secara umum
Meskipun bagian di atas khusus untuk layanan PaaS berbasis Azure Spark, bagian ini mencakup lingkungan Spark umum apa pun. Dependensi konektor, impor, dan konfigurasi sesi Spark dirinci di bawah ini. Bagian "Langkah selanjutnya" mencakup sampel kode untuk operasi DDL, operasi DML, dan lainnya.
Dependensi konektor:
- Tambahkan koordinat maven untuk mendapatkan konektor Cassandra untuk Spark
- Menambahkan koordinat maven untuk pustaka pembantu Azure Cosmos DB untuk API untuk Cassandra
Impor:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Konfigurasi sesi Spark:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Langkah berikutnya
Artikel berikut menunjukkan integrasi Spark dengan Azure Cosmos DB untuk Apache Cassandra.