Aracılığıyla paylaş


Öğretici: Spark kullanarak NoSQL için Azure Cosmos DB'ye bağlanma

UYGULANANLAR: NoSQL

Bu öğreticide, NoSQL için Azure Cosmos DB hesabından veri okumak veya yazmak için Azure Cosmos DB Spark bağlayıcısını kullanacaksınız. Bu öğreticide, Spark'tan NoSQL için API ile tümleştirmeyi göstermek için Azure Databricks ve jupyter not defteri kullanılır. Bu öğretici, Python ve Scala'ya odaklanır ancak Spark tarafından desteklenen herhangi bir dili veya arabirimi kullanabilirsiniz.

Bu öğreticide aşağıdakilerin nasıl yapılacağını öğreneceksiniz:

  • Spark ve Jupyter not defteri kullanarak NoSQL hesabı için BIR API'ye bağlanın.
  • Veritabanı ve kapsayıcı kaynakları oluşturma.
  • Kapsayıcıya veri alma.
  • Kapsayıcıdaki verileri sorgulama.
  • Kapsayıcıdaki öğelerde yaygın işlemler gerçekleştirme.

Önkoşullar

Spark ve Jupyter kullanarak bağlanma

NoSQL için Azure Cosmos DB hesabınıza bağlanmak üzere Apache Spark 3.4.x kullanmaya hazır bir işlem kümesi oluşturmak için mevcut Azure Databricks çalışma alanınızı kullanın.

  1. Azure Databricks çalışma alanınızı açın.

  2. Çalışma alanı arabiriminde yeni bir küme oluşturun. Kümeyi en az şu ayarlarla yapılandırın:

    Sürüm Değer
    Çalışma zamanı sürümü 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Maven Central'danGrup Kimliği ile Maven paketlerini aramak için çalışma alanı arabirimini com.azure.cosmos.sparkkullanın. Paketi özellikle Spark 3.4 için kümeye ön ekli bir Yapıt Kimliği ile azure-cosmos-spark_3-4 yükleyin.

  4. Son olarak yeni bir not defteri oluşturun.

    İpucu

    Varsayılan olarak, not defteri son oluşturulan kümeye eklenir.

  5. Not defteri içinde, NoSQL hesap uç noktası, veritabanı adı ve kapsayıcı adı için çevrimiçi işlem işleme (OLTP) yapılandırma ayarlarını yapın.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Veritabanı ve kapsayıcı oluşturma

Veritabanları ve kapsayıcılar gibi hesap kaynaklarını yönetmek için Katalog API'sini kullanın. Ardından OLTP kullanarak kapsayıcı kaynaklarındaki verileri yönetebilirsiniz.

  1. Spark kullanarak NoSQL kaynaklarının API'sini yönetmek için Katalog API'sini yapılandırın.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. kullanarak CREATE DATABASE IF NOT EXISTSadlı cosmicworks yeni bir veritabanı oluşturun.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. kullanarak CREATE TABLE IF NOT EXISTSadlı products yeni bir kapsayıcı oluşturun. Bölüm anahtarı yolunu /category olarak ayarladığınızdan ve saniye başına maksimum istek birimi (RU) aktarım hızıyla otomatik ölçeklendirme aktarım hızını etkinleştirdiğinizden 1000 emin olun.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Hiyerarşik bölüm anahtarı yapılandırması kullanarak adlı employees başka bir kapsayıcı oluşturun. Bölüm anahtarı yolları kümesi olarak , /departmentve /team kullanın/organization. Bu belirli siparişi izleyin. Ayrıca aktarım hızını el ile ru miktarına 400 ayarlayın.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. NoSQL hesabı için API'nizde veritabanınızın ve kapsayıcılarınızın oluşturulduğunu doğrulamak için not defteri hücrelerini çalıştırın.

Verileri alma

Örnek bir veri kümesi oluşturun. Ardından OLTP kullanarak bu verileri NoSQL kapsayıcısı için API'ye alın.

  1. Örnek bir veri kümesi oluşturun.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Hedef kapsayıcıya örnek veriler eklemek için ve daha önce kaydedilmiş OLTP yapılandırmasını kullanın spark.createDataFrame .

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Verileri sorgulama

Veriler üzerinde yaygın sorgular gerçekleştirmek için OLTP verilerini bir veri çerçevesine yükleyin. Verileri filtrelemek veya sorgulamak için çeşitli söz dizimleri kullanabilirsiniz.

  1. OLTP verilerini bir veri çerçevesi nesnesine yüklemek için kullanın spark.read . Bu öğreticinin önceki bölümlerinde kullandığınız yapılandırmayı kullanın. Ayrıca Spark bağlayıcısının var olan öğeleri örnekleme yoluyla şemayı çıkarmasına izin vermek için olarak ayarlayın spark.cosmos.read.inferSchema.enabledtrue .

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. kullanarak veri çerçevesinde yüklenen verilerin şemasını işleyerek printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Veri satırlarını sütunun değerinden quantity20küçük olduğu yerlerde işle. Bu sorguyu where gerçekleştirmek için ve show işlevlerini kullanın.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Sütunun olduğu clearance ilk veri satırını işler true. Bu sorguyu filter gerçekleştirmek için işlevini kullanın.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Filtre veya kesme olmadan beş veri satırı işleyin. show İşlenen satırların görünümünü ve sayısını özelleştirmek için işlevini kullanın.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Bu ham NoSQL sorgu dizesini kullanarak verilerinizi sorgula: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Yaygın işlemler gerçekleştirme

Spark'ta NoSQL verileri için API ile çalışırken kısmi güncelleştirmeler gerçekleştirebilir veya verilerle ham JSON olarak çalışabilirsiniz.

  1. Öğenin kısmi güncelleştirmesini gerçekleştirmek için:

    1. Mevcut config yapılandırma değişkenini kopyalayın ve yeni kopyadaki özellikleri değiştirin. Özellikle, yazma stratejisini olarak ItemPatchyapılandırın. Ardından toplu desteği devre dışı bırakın. Sütunları ve eşlenmiş işlemleri ayarlayın. Son olarak, varsayılan işlem türünü olarak Setayarlayın.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Bu düzeltme eki işleminin bir parçası olarak hedeflemek istediğiniz öğe bölüm anahtarı ve benzersiz tanımlayıcı için değişkenler oluşturun.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Hedef öğeyi belirtmek ve değiştirilmesi gereken alanları belirtmek için bir dizi düzeltme eki nesnesi oluşturun.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Düzeltme eki nesneleri kümesini kullanarak bir veri çerçevesi oluşturun. Düzeltme eki işlemini gerçekleştirmek için kullanın write .

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Düzeltme eki işleminin sonuçlarını gözden geçirmek için bir sorgu çalıştırın. Öğe artık başka bir değişiklik olmadan adlandırılmalıdır Yamba New Surfboard .

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Ham JSON verileriyle çalışmak için:

    1. Mevcut config yapılandırma değişkenini kopyalayın ve yeni kopyadaki özellikleri değiştirin. Özellikle hedef kapsayıcıyı olarak employeesdeğiştirin. Ardından sütunu/alanı ham JSON verilerini kullanacak şekilde yapılandırın contacts .

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Kapsayıcıya almak için bir çalışan kümesi oluşturun.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Bir veri çerçevesi oluşturun ve çalışan verilerini almak için kullanın write .

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. kullanarak veri çerçevesindeki verileri işleyerek show. Sütunun çıkışta contacts ham JSON olduğunu gözlemleyin.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Sonraki adım