Bagikan melalui


Tutorial: Menyambungkan ke Azure Cosmos DB for NoSQL dengan menggunakan Spark

BERLAKU UNTUK: NoSQL

Dalam tutorial ini, Anda menggunakan konektor Azure Cosmos DB Spark untuk membaca atau menulis data dari akun Azure Cosmos DB for NoSQL. Tutorial ini menggunakan Azure Databricks dan notebook Jupyter untuk mengilustrasikan cara berintegrasi dengan API untuk NoSQL dari Spark. Tutorial ini berfokus pada Python dan Scala, meskipun Anda dapat menggunakan bahasa atau antarmuka apa pun yang didukung oleh Spark.

Dalam tutorial ini, Anda akan mempelajari cara:

  • Sambungkan ke API untuk akun NoSQL dengan menggunakan Spark dan notebook Jupyter.
  • Membuat sumber daya database dan kontainer.
  • Menyerap data ke kontainer.
  • Mengkueri data dalam kontainer.
  • Lakukan operasi umum pada item dalam kontainer.

Prasyarat

  • Akun Azure Cosmos DB for NoSQL yang sudah ada.
  • Ruang kerja Azure Databricks yang sudah ada.

Menyambungkan dengan menggunakan Spark dan Jupyter

Gunakan ruang kerja Azure Databricks yang ada untuk membuat kluster komputasi yang siap menggunakan Apache Spark 3.4.x untuk menyambungkan ke akun Azure Cosmos DB for NoSQL Anda.

  1. Buka ruang kerja Azure Databricks Anda.

  2. Di antarmuka ruang kerja, buat kluster baru. Konfigurasikan kluster dengan pengaturan ini, minimal:

    Versi Nilai
    Versi runtime 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Gunakan antarmuka ruang kerja untuk mencari paket Maven dari Maven Central dengan ID com.azure.cosmos.sparkGrup . Instal paket khusus untuk Spark 3.4 dengan ID Artefak yang diawali dengan azure-cosmos-spark_3-4 ke kluster.

  4. Terakhir, buat buku catatan baru.

    Tip

    Secara default, buku catatan dilampirkan ke kluster yang baru dibuat.

  5. Dalam buku catatan, atur pengaturan konfigurasi pemrosesan transaksi online (OLTP) untuk titik akhir akun NoSQL, nama database, dan nama kontainer.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Membuat database dan kontainer

Gunakan API Katalog untuk mengelola sumber daya akun seperti database dan kontainer. Kemudian, Anda dapat menggunakan OLTP untuk mengelola data dalam sumber daya kontainer.

  1. Konfigurasikan API Katalog untuk mengelola API untuk sumber daya NoSQL dengan menggunakan Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Buat database baru bernama cosmicworks dengan menggunakan CREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Buat kontainer baru bernama products dengan menggunakan CREATE TABLE IF NOT EXISTS. Pastikan Anda mengatur jalur kunci partisi ke /category dan mengaktifkan throughput skala otomatis dengan throughput 1000 maksimum unit permintaan (RU) per detik.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Buat kontainer lain bernama employees dengan menggunakan konfigurasi kunci partisi hierarkis. Gunakan /organization, /department, dan /team sebagai kumpulan jalur kunci partisi. Ikuti urutan tertentu tersebut. Selain itu, atur throughput ke sejumlah 400 RU manual.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Jalankan sel buku catatan untuk memvalidasi bahwa database dan kontainer Anda dibuat dalam API Anda untuk akun NoSQL.

Menyerap data

Membuat himpunan data sampel. Kemudian gunakan OLTP untuk menyerap data tersebut ke API untuk kontainer NoSQL.

  1. Membuat himpunan data sampel.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Gunakan spark.createDataFrame dan konfigurasi OLTP yang disimpan sebelumnya untuk menambahkan data sampel ke kontainer target.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Mengkueri data

Muat data OLTP ke dalam bingkai data untuk melakukan kueri umum pada data. Anda bisa menggunakan berbagai sintaks untuk memfilter atau mengkueri data.

  1. Gunakan spark.read untuk memuat data OLTP ke dalam objek bingkai data. Gunakan konfigurasi yang sama dengan yang Anda gunakan sebelumnya dalam tutorial ini. Selain itu, atur spark.cosmos.read.inferSchema.enabled ke true untuk memungkinkan konektor Spark menyimpulkan skema dengan mengambil sampel item yang ada.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Render skema data yang dimuat dalam bingkai data dengan menggunakan printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Merender baris data di mana quantity kolom kurang dari 20. where Gunakan fungsi dan show untuk melakukan kueri ini.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Render baris data pertama di mana clearance kolom adalah true. filter Gunakan fungsi untuk melakukan kueri ini.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Render lima baris data tanpa filter atau pemotongan. show Gunakan fungsi untuk menyesuaikan tampilan dan jumlah baris yang dirender.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Kueri data Anda dengan menggunakan string kueri NoSQL mentah ini: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Melakukan operasi umum

Saat Anda bekerja dengan API untuk data NoSQL di Spark, Anda dapat melakukan pembaruan parsial atau bekerja dengan data sebagai JSON mentah.

  1. Untuk melakukan pembaruan parsial item:

    1. Salin variabel konfigurasi yang config ada dan ubah properti di salinan baru. Secara khusus, konfigurasikan strategi tulis ke ItemPatch. Kemudian nonaktifkan dukungan massal. Atur kolom dan operasi yang dipetakan. Terakhir, atur jenis operasi default ke Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Buat variabel untuk kunci partisi item dan pengidentifikasi unik yang ingin Anda targetkan sebagai bagian dari operasi patch ini.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Buat sekumpulan objek patch untuk menentukan item target dan tentukan bidang yang harus dimodifikasi.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Buat bingkai data dengan menggunakan sekumpulan objek patch. Gunakan write untuk melakukan operasi patch.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Jalankan kueri untuk meninjau hasil operasi patch. Item sekarang harus diberi nama Yamba New Surfboard tanpa perubahan lain.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Untuk bekerja dengan data JSON mentah:

    1. Salin variabel konfigurasi yang config ada dan ubah properti di salinan baru. Secara khusus, ubah kontainer target menjadi employees. Kemudian konfigurasikan contacts kolom/bidang untuk menggunakan data JSON mentah.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Buat sekumpulan karyawan untuk diserap ke dalam kontainer.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Buat bingkai data dan gunakan write untuk menyerap data karyawan.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Render data dari bingkai data dengan menggunakan show. Amati bahwa contacts kolom adalah JSON mentah dalam output.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Langkah selanjutnya