Tutorial: Koneksi ke Azure Cosmos DB untuk NoSQL menggunakan Spark

BERLAKU UNTUK: NoSQL

Dalam tutorial ini, Anda menggunakan konektor Azure Cosmos DB Spark untuk membaca atau menulis data dari akun Azure Cosmos DB for NoSQL. Tutorial ini menggunakan Azure Databricks dan notebook Jupyter untuk mengilustrasikan cara berintegrasi dengan API untuk NoSQL dari Spark. Tutorial ini berfokus pada Python dan Scala meskipun Anda dapat menggunakan bahasa atau antarmuka apa pun yang didukung oleh Spark.

Dalam tutorial ini, Anda akan mempelajari cara:

  • Koneksi ke API untuk akun NoSQL menggunakan Spark dan notebook Jupyter
  • Membuat database dan sumber daya kontainer
  • Menyerap data ke kontainer
  • Mengkueri data dalam kontainer
  • Melakukan operasi umum pada item dalam kontainer

Prasyarat

  • Akun Azure Cosmos DB for NoSQL yang sudah ada.
  • Ruang kerja Azure Databricks yang sudah ada.

Koneksi menggunakan Spark dan Jupyter

Gunakan ruang kerja Azure Databricks yang ada untuk membuat kluster komputasi yang siap menggunakan Apache Spark 3.4.x untuk menyambungkan ke akun Azure Cosmos DB for NoSQL Anda.

  1. Buka ruang kerja Azure Databricks Anda.

  2. Di antarmuka ruang kerja, buat kluster baru. Konfigurasikan kluster dengan pengaturan ini, minimal:

    Nilai
    Versi runtime 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Gunakan antarmuka ruang kerja untuk mencari paket Maven dari Maven Central dengan Idcom.azure.cosmos.sparkGrup . Instal paket khusus untuk Spark 3.4 dengan Id Artefak yang diawali dengan azure-cosmos-spark_3-4 ke kluster.

  4. Terakhir, buat buku catatan baru.

    Tip

    Secara default, notebook akan dilampirkan ke kluster yang baru dibuat.

  5. Dalam notebook, atur pengaturan konfigurasi OLTP untuk titik akhir akun NoSQL, nama database, dan nama kontainer.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Membuat database serta kontainer

Gunakan API Katalog untuk mengelola sumber daya akun seperti database dan kontainer. Kemudian, Anda dapat menggunakan OLTP untuk mengelola data dalam sumber daya kontainer.

  1. Konfigurasikan API Katalog untuk mengelola API untuk sumber daya NoSQL menggunakan Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Buat database baru bernama cosmicworks menggunakan CREATE DATABASE IF NOT EXISTS.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Buat kontainer baru bernama products menggunakan CREATE TABLE IF NOT EXISTS. Pastikan Anda mengatur jalur kunci partisi ke /category dan mengaktifkan throughput skala otomatis dengan throughput 1000 maksimum unit permintaan per detik (RU/dtk).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Buat kontainer lain bernama employees menggunakan konfigurasi kunci partisi hierarkis dengan /organization, /department, dan /team sebagai kumpulan jalur kunci partisi dalam urutan tertentu. Selain itu, atur throughput ke jumlah 400 RU/dtk manual

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Jalankan sel buku catatan untuk memvalidasi bahwa database dan kontainer Anda dibuat dalam API Anda untuk akun NoSQL.

Menyerap data

Buat himpunan data sampel lalu gunakan OLTP untuk menyerap data tersebut ke API untuk kontainer NoSQL.

  1. Buat sampel himpunan data.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Gunakan spark.createDataFrame dan konfigurasi OLTP yang disimpan sebelumnya untuk menambahkan data sampel ke kontainer target.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Mengkueri data

Muat data OLTP ke dalam bingkai data untuk melakukan kueri umum pada data. Anda bisa menggunakan berbagai sintaks filter atau data kueri.

  1. Gunakan spark.read untuk memuat data OLTP ke dalam objek dataframe. Gunakan konfigurasi yang sama yang digunakan sebelumnya dalam tutorial ini. Selain itu, atur spark.cosmos.read.inferSchema.enabled ke true untuk memungkinkan konektor Spark menyimpulkan skema dengan mengambil sampel item yang ada.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Render skema data yang dimuat dalam dataframe menggunakan printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Merender baris data di mana quantity kolom kurang dari 20. where Gunakan fungsi dan show untuk melakukan kueri ini.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Render baris data pertama di mana clearance kolom benar. filter Gunakan fungsi untuk melakukan kueri ini.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Render lima baris data tanpa filter atau pemotongan. show Gunakan fungsi untuk menyesuaikan tampilan dan jumlah baris yang dirender.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Kueri data Anda menggunakan string kueri NoSQL mentah ini: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Melakukan operasi umum

Saat bekerja dengan API untuk data NoSQL di Spark, Anda dapat melakukan pembaruan parsial atau bekerja dengan data sebagai JSON mentah.

  1. Untuk melakukan pembaruan parsial item, lakukan langkah-langkah berikut:

    1. Salin variabel konfigurasi yang config ada dan ubah properti di salinan baru. Khusus; konfigurasikan strategi tulis ke ItemPatch, nonaktifkan dukungan massal, atur kolom dan operasi yang dipetakan, dan akhirnya atur jenis operasi default ke Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Buat variabel untuk kunci partisi item dan pengidentifikasi unik yang ingin Anda targetkan sebagai bagian dari operasi patch ini.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Buat sekumpulan objek patch untuk menentukan item target dan tentukan bidang yang harus dimodifikasi.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Buat bingkai data menggunakan sekumpulan objek patch dan gunakan write untuk melakukan operasi patch.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Jalankan kueri untuk meninjau hasil operasi patch. Item sekarang harus diberi nama Yamba New Surfboard tanpa perubahan lain.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Untuk bekerja dengan data JSON mentah, lakukan langkah-langkah berikut:

    1. Salin variabel konfigurasi yang config ada dan ubah properti di salinan baru. Khusus; ubah kontainer target menjadi employees dan konfigurasikan contacts kolom/bidang untuk menggunakan data JSON mentah.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Buat sekumpulan karyawan untuk diserap ke dalam kontainer.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Buat bingkai data dan gunakan write untuk menyerap data karyawan.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Render data dari bingkai data menggunakan show. Amati bahwa contacts kolom adalah JSON mentah dalam output.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Langkah selanjutnya