Tutorial: Menyambungkan ke Azure Cosmos DB for NoSQL dengan menggunakan Spark
BERLAKU UNTUK: NoSQL
Dalam tutorial ini, Anda menggunakan konektor Azure Cosmos DB Spark untuk membaca atau menulis data dari akun Azure Cosmos DB for NoSQL. Tutorial ini menggunakan Azure Databricks dan notebook Jupyter untuk mengilustrasikan cara berintegrasi dengan API untuk NoSQL dari Spark. Tutorial ini berfokus pada Python dan Scala, meskipun Anda dapat menggunakan bahasa atau antarmuka apa pun yang didukung oleh Spark.
Dalam tutorial ini, Anda akan mempelajari cara:
- Sambungkan ke API untuk akun NoSQL dengan menggunakan Spark dan notebook Jupyter.
- Membuat sumber daya database dan kontainer.
- Menyerap data ke kontainer.
- Mengkueri data dalam kontainer.
- Lakukan operasi umum pada item dalam kontainer.
Prasyarat
- Akun Azure Cosmos DB for NoSQL yang sudah ada.
- Jika Anda memiliki langganan Azure yang sudah ada, buat akun baru.
- Tidak ada langganan Azure? Anda dapat mencoba Azure Cosmos DB gratis tanpa kartu kredit yang diperlukan.
- Ruang kerja Azure Databricks yang sudah ada.
Menyambungkan dengan menggunakan Spark dan Jupyter
Gunakan ruang kerja Azure Databricks yang ada untuk membuat kluster komputasi yang siap menggunakan Apache Spark 3.4.x untuk menyambungkan ke akun Azure Cosmos DB for NoSQL Anda.
Buka ruang kerja Azure Databricks Anda.
Di antarmuka ruang kerja, buat kluster baru. Konfigurasikan kluster dengan pengaturan ini, minimal:
Versi Nilai Versi runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Gunakan antarmuka ruang kerja untuk mencari paket Maven dari Maven Central dengan ID
com.azure.cosmos.spark
Grup . Instal paket khusus untuk Spark 3.4 dengan ID Artefak yang diawali denganazure-cosmos-spark_3-4
ke kluster.Terakhir, buat buku catatan baru.
Tip
Secara default, buku catatan dilampirkan ke kluster yang baru dibuat.
Dalam buku catatan, atur pengaturan konfigurasi pemrosesan transaksi online (OLTP) untuk titik akhir akun NoSQL, nama database, dan nama kontainer.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Membuat database dan kontainer
Gunakan API Katalog untuk mengelola sumber daya akun seperti database dan kontainer. Kemudian, Anda dapat menggunakan OLTP untuk mengelola data dalam sumber daya kontainer.
Konfigurasikan API Katalog untuk mengelola API untuk sumber daya NoSQL dengan menggunakan Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Buat database baru bernama
cosmicworks
dengan menggunakanCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Buat kontainer baru bernama
products
dengan menggunakanCREATE TABLE IF NOT EXISTS
. Pastikan Anda mengatur jalur kunci partisi ke/category
dan mengaktifkan throughput skala otomatis dengan throughput1000
maksimum unit permintaan (RU) per detik.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Buat kontainer lain bernama
employees
dengan menggunakan konfigurasi kunci partisi hierarkis. Gunakan/organization
,/department
, dan/team
sebagai kumpulan jalur kunci partisi. Ikuti urutan tertentu tersebut. Selain itu, atur throughput ke sejumlah400
RU manual.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Jalankan sel buku catatan untuk memvalidasi bahwa database dan kontainer Anda dibuat dalam API Anda untuk akun NoSQL.
Menyerap data
Membuat himpunan data sampel. Kemudian gunakan OLTP untuk menyerap data tersebut ke API untuk kontainer NoSQL.
Membuat himpunan data sampel.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Gunakan
spark.createDataFrame
dan konfigurasi OLTP yang disimpan sebelumnya untuk menambahkan data sampel ke kontainer target.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Mengkueri data
Muat data OLTP ke dalam bingkai data untuk melakukan kueri umum pada data. Anda bisa menggunakan berbagai sintaks untuk memfilter atau mengkueri data.
Gunakan
spark.read
untuk memuat data OLTP ke dalam objek bingkai data. Gunakan konfigurasi yang sama dengan yang Anda gunakan sebelumnya dalam tutorial ini. Selain itu, aturspark.cosmos.read.inferSchema.enabled
ketrue
untuk memungkinkan konektor Spark menyimpulkan skema dengan mengambil sampel item yang ada.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Render skema data yang dimuat dalam bingkai data dengan menggunakan
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Merender baris data di mana
quantity
kolom kurang dari20
.where
Gunakan fungsi danshow
untuk melakukan kueri ini.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Render baris data pertama di mana
clearance
kolom adalahtrue
.filter
Gunakan fungsi untuk melakukan kueri ini.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Render lima baris data tanpa filter atau pemotongan.
show
Gunakan fungsi untuk menyesuaikan tampilan dan jumlah baris yang dirender.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Kueri data Anda dengan menggunakan string kueri NoSQL mentah ini:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Melakukan operasi umum
Saat Anda bekerja dengan API untuk data NoSQL di Spark, Anda dapat melakukan pembaruan parsial atau bekerja dengan data sebagai JSON mentah.
Untuk melakukan pembaruan parsial item:
Salin variabel konfigurasi yang
config
ada dan ubah properti di salinan baru. Secara khusus, konfigurasikan strategi tulis keItemPatch
. Kemudian nonaktifkan dukungan massal. Atur kolom dan operasi yang dipetakan. Terakhir, atur jenis operasi default keSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Buat variabel untuk kunci partisi item dan pengidentifikasi unik yang ingin Anda targetkan sebagai bagian dari operasi patch ini.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Buat sekumpulan objek patch untuk menentukan item target dan tentukan bidang yang harus dimodifikasi.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Buat bingkai data dengan menggunakan sekumpulan objek patch. Gunakan
write
untuk melakukan operasi patch.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Jalankan kueri untuk meninjau hasil operasi patch. Item sekarang harus diberi nama
Yamba New Surfboard
tanpa perubahan lain.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Untuk bekerja dengan data JSON mentah:
Salin variabel konfigurasi yang
config
ada dan ubah properti di salinan baru. Secara khusus, ubah kontainer target menjadiemployees
. Kemudian konfigurasikancontacts
kolom/bidang untuk menggunakan data JSON mentah.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Buat sekumpulan karyawan untuk diserap ke dalam kontainer.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Buat bingkai data dan gunakan
write
untuk menyerap data karyawan.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Render data dari bingkai data dengan menggunakan
show
. Amati bahwacontacts
kolom adalah JSON mentah dalam output.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Konten terkait
- Apache Spark
- API Katalog Azure Cosmos DB
- Referensi parameter konfigurasi
- Sampel Konektor Spark Azure Cosmos DB
- Migrasi dari Spark 2.4 ke Spark 3.*
- Kompatibilitas versi:
- Catatan rilis:
- Tautan unduhan: