Bagikan melalui


Menyambungkan ke Azure Cosmos DB untuk Apache Cassandra dari Spark

BERLAKU UNTUK: Cassandra

Artikel ini adalah salah satu dari serangkaian artikel tentang integrasi Azure Cosmos DB untuk Apache Cassandra dari Spark. Artikel ini mencakup konektivitas, operasi Bahasa Definisi Data (DDL), operasi Bahasa Manipulasi Data (DML) dasar, dan integrasi Azure Cosmos DB untuk Apache Cassandra tingkat lanjut dari Spark.

Prasyarat

Dependensi untuk konektivitas

  • Konektor Spark untuk Cassandra: Konektor Spark digunakan untuk terhubung ke Azure Cosmos DB untuk Apache Cassandra. Identifikasi dan gunakan versi konektor yang terletak di pusat Maven yang kompatibel dengan versi Spark dan Scala dari lingkungan Spark Anda. Sebaiknya lingkungan yang mendukung Spark 3.2.1 atau yang lebih tinggi, dan konektor spark yang tersedia di koordinat maven com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Jika menggunakan Spark 2.x, kami merekomendasikan lingkungan dengan Spark versi 2.4.5, menggunakan konektor spark pada koordinat maven com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Pustaka pembantu Azure Cosmos DB untuk API untuk Cassandra: Jika Anda menggunakan versi Spark 2.x, maka selain konektor Spark, Anda memerlukan pustaka lain yang disebut azure-cosmos-cassandra-spark-helper dengan koordinat com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 maven dari Azure Cosmos DB untuk menangani pembatasan tarif. Pustaka ini berisi pabrik koneksi kustom dan kelas kebijakan percobaan kembali.

    Kebijakan percobaan kembali di Microsoft Azure Cosmos DB dikonfigurasi untuk menangani pengecualian kode status HTTP 429("Tingkat Permintaan Besar"). Azure Cosmos DB for Apache Cassandra menerjemahkan pengecualian ini menjadi kesalahan yang kelebihan beban pada protokol asli Cassandra, dan Anda dapat mencoba kembali dengan back-off. Karena Microsoft Azure Cosmos DB menggunakan model throughput yang di provisikan, tingkat permintaan yang membatasi pengecualian terjadi ketika tingkat masuk / keluar meningkat. Kebijakan percobaan kembali melindungi pekerjaan spark Anda terhadap lonjakan data yang sesaat melebihi throughput yang dialokasikan untuk kontainer Anda. Jika menggunakan konektor Spark 3.x, tidak perlu menerapkan pustaka ini.

    Catatan

    Kebijakan percobaan kembali dapat melindungi pekerjaan spark Anda terhadap lonjakan sesaat saja. Jika Anda belum mengonfigurasi cukup RU yang diperlukan untuk menjalankan beban kerja Anda, maka kebijakan percobaan kembali tidak berlaku dan kelas kebijakan percobaan kembali menemukan kembali pengecualian.

  • Detail koneksi akun Azure Cosmos DB: Api Azure Anda untuk nama akun Cassandra, titik akhir akun, dan kunci.

Mengoptimalkan konfigurasi throughput konektor Spark

Di bagian berikutnya, tercantum semua parameter yang relevan untuk mengontrol throughput menggunakan Spark Connector untuk Cassandra. Untuk mengoptimalkan parameter guna memaksimalkan throughput untuk pekerjaan spark, konfigurasi spark.cassandra.output.concurrent.writes, spark.cassandra.concurrent.reads, dan spark.cassandra.input.reads_per_sec perlu dikonfigurasi dengan benar, untuk menghindari terlalu banyak pembatasan dan back-off (yang dapat menyebabkan throughput yang lebih rendah).

Nilai optimal konfigurasi ini tergantung pada 4 faktor:

  • Jumlah throughput (Unit Permintaan) yang dikonfigurasi untuk tabel tempat data diserap.
  • Jumlah pekerja di kluster Spark Anda.
  • Jumlah eksekutor yang dikonfigurasi untuk pekerjaan spark Anda (yang dapat dikontrol menggunakan spark.cassandra.connection.connections_per_executor_max atau spark.cassandra.connection.remoteConnectionsPerExecutor tergantung pada versi Spark)
  • Latensi rata-rata setiap permintaan ke Azure Cosmos DB, jika Anda dikolokasikan di Pusat Data yang sama. Asumsikan nilai ini menjadi 10 md untuk tulis dan 3 md untuk baca.

Misalnya, jika kita memiliki lima pekerja dan nilai spark.cassandra.output.concurrent.writes = 1, dan nilai spark.cassandra.connection.remoteConnectionsPerExecutor = 1, maka kita memiliki lima pekerja yang secara bersamaan menulis ke dalam meja, masing-masing dengan satu utas. Jika dibutuhkan 10 md untuk melakukan satu tulis, kita dapat mengirim 100 permintaan (1000 milidetik dibagi 10) per detik, per alur. Dengan lima pekerja, jumlahnya akan menjadi 500 tulis per detik. Dengan biaya rata-rata lima unit permintaan (RU) per tulis, tabel target akan membutuhkan minimal 2500 unit permintaan yang tersedia (5 RU x 500 tulis per detik).

Meningkatkan jumlah eksekutor dapat meningkatkan jumlah alur dalam pekerjaan tertentu, yang dapat meningkatkan throughput. Namun, dampak pasti dari ini dapat bervariasi tergantung pada pekerjaan, sementara mengontrol throughput dengan jumlah pekerja lebih deterministik. Anda juga dapat menentukan biaya pasti dari permintaan tertentu dengan membuat profil untuk mendapatkan biaya Unit Permintaan (RU). Ini akan membantu Anda menjadi lebih akurat saat memprovisikan throughput untuk tabel atau keyspace Anda. Lihat artikel kami di sini untuk memahami cara mendapatkan biaya unit permintaan pada tingkat per permintaan.

Penskalaan throughput dalam database

Konektor Cassandra Spark akan memenuhi throughput di Azure Cosmos DB dengan sangat efisien. Akibatnya, bahkan dengan percobaan ulang yang efektif, Anda perlu memastikan Anda memiliki throughput (RU) yang cukup yang tersedia pada tingkat tabel atau keyspace untuk mencegah kesalahan terkait pembatasan kecepatan. Pengaturan minimum 400 RU dalam tabel atau keyspace tertentu tidak akan cukup. Bahkan pada pengaturan konfigurasi throughput minimum, konektor Spark dapat menulis pada tingkat yang sesuai dengan sekitar 6000 unit permintaan atau lebih.

Jika pengaturan RU yang diperlukan untuk pergerakan data menggunakan Spark lebih tinggi dari apa yang diperlukan untuk beban kerja keadaan stabil Anda, Anda dapat dengan mudah meningkatkan throughput naik dan turun secara sistematis di Azure Cosmos DB untuk memenuhi kebutuhan beban kerja Anda untuk jangka waktu tertentu. Baca artikel kami tentang skala elastis di API untuk Cassandra untuk memahami berbagai opsi untuk penskalaan secara terprogram dan dinamis.

Catatan

Panduan di atas mengasumsikan distribusi data yang cukup seragam. Jika Anda memiliki kecondongan yang signifikan dalam data (yaitu sejumlah besar baca/tulis ke nilai kunci partisi yang sama), Anda mungkin masih mengalami penyempitan, meskipun Anda memiliki sejumlah besar unit permintaan yang diprovisikan dalam tabel Anda. Unit permintaan dibagi rata di antara partisi fisik, dan kecondongan data berat dapat menyebabkan penyempitan permintaan ke satu partisi.

Parameter konfigurasi throughput konektor Spark

Tabel berikut mencantumkan parameter konfigurasi throughput khusus Azure Cosmos DB untuk Apache Cassandra yang disediakan oleh konektor. Untuk daftar rinci semua parameter konfigurasi, lihat halaman referensi konfigurasi repositori Spark Cassandra Connector GitHub.

Nama Properti Nilai default Keterangan
spark.cassandra.output.batch.size.rows 1 Jumlah baris per batch tunggal. Atur parameter ini ke 1. Parameter ini digunakan untuk mencapai throughput yang lebih tinggi untuk beban kerja yang berat.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Tidak Jumlah maksimum koneksi per node per eksekutor. 10*n setara dengan 10 koneksi per node dalam klaster n-node Cassandra. Jadi, jika Anda memerlukan lima koneksi per node per eksekutor untuk lima node kluster Cassandra, maka Anda harus mengatur konfigurasi ini menjadi 25. Ubah nilai ini berdasarkan tingkat paralelisme atau jumlah pelaksana tempat pekerjaan spark Anda dikonfigurasi.
spark.cassandra.output.concurrent.writes 100 Mendefinisikan jumlah tulisan paralel yang dapat terjadi per pelaksana. Karena Anda mengatur "batch.size.rows" ke 1, pastikan untuk menskalakan nilai ini sesuai. Ubah nilai ini berdasarkan tingkat paralelisme atau throughput yang ingin Anda capai untuk beban kerja Anda.
spark.cassandra.concurrent.reads 512 Menentukan jumlah pembacaan paralel yang dapat terjadi per pelaksana. Ubah nilai ini berdasarkan tingkat paralelisme atau throughput yang ingin Anda capai untuk beban kerja Anda
spark.cassandra.output.throughput_mb_per_sec Tidak Menentukan total throughput tulis per pelaksana. Parameter ini dapat digunakan sebagai batas atas untuk throughput pekerjaan spark Anda, dan mendasarkannya pada throughput yang disediakan dari kontainer Azure Cosmos DB Anda.
spark.cassandra.input.reads_per_sec Tidak Menentukan total throughput baca per pelaksana. Parameter ini dapat digunakan sebagai batas atas untuk throughput pekerjaan spark Anda, dan mendasarkannya pada throughput yang disediakan dari kontainer Azure Cosmos DB Anda.
spark.cassandra.output.batch.grouping.buffer.size 1000 Menentukan jumlah batch per tugas spark tunggal yang dapat disimpan dalam memori sebelum mengirim ke API untuk Cassandra
spark.cassandra.connection.keep_alive_ms 60000 Menentukan periode waktu hingga koneksi yang tidak digunakan tersedia.

Sesuaikan throughput dan tingkat paralelisme parameter ini berdasarkan beban kerja yang Anda harapkan untuk pekerjaan spark Anda, dan throughput yang telah Anda provisikan untuk akun Azure Cosmos DB Anda.

Menyambungkan ke Azure Cosmos DB untuk Apache Cassandra dari Spark

cqlsh

Perintah berikut merinci cara menyambungkan ke Azure Cosmos DB untuk Apache Cassandra dari cqlsh. Ini berguna untuk validasi saat Anda menjalankan sampel di Spark.
Dari Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Artikel di bawah ini membahas provisi kluster Azure Databricks, konfigurasi kluster untuk menyambungkan ke Azure Cosmos DB for Apache Cassandra, dan beberapa contoh notebook yang mencakup operasi DDL, operasi DML, dan banyak lagi.
Bekerja dengan Azure Cosmos DB untuk Apache Cassandra dari Azure Databricks

2. Azure HDInsight-Spark

Artikel di bawah ini membahas layanan HDinsight-Spark, provisi, konfigurasi kluster untuk menyambungkan ke Azure Cosmos DB untuk Apache Cassandra, dan beberapa contoh notebook yang mencakup operasi DDL, operasi DML, dan banyak lagi.
Bekerja dengan Azure Cosmos DB untuk Apache Cassandra dari Azure HDInsight-Spark

3. Lingkungan Spark secara umum

Meskipun bagian di atas khusus untuk layanan PaaS berbasis Azure Spark, bagian ini mencakup lingkungan Spark umum apa pun. Dependensi konektor, impor, dan konfigurasi sesi Spark dirinci di bawah ini. Bagian "Langkah selanjutnya" mencakup sampel kode untuk operasi DDL, operasi DML, dan lainnya.

Dependensi konektor:

  1. Tambahkan koordinat maven untuk mendapatkan konektor Cassandra untuk Spark
  2. Menambahkan koordinat maven untuk pustaka pembantu Azure Cosmos DB untuk API untuk Cassandra

Impor:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Konfigurasi sesi Spark:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Langkah berikutnya

Artikel berikut menunjukkan integrasi Spark dengan Azure Cosmos DB untuk Apache Cassandra.