Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu öğreticide, NoSQL için Azure Cosmos DB hesabından veri okumak veya yazmak için Azure Cosmos DB Spark bağlayıcısını kullanacaksınız. Bu öğreticide, Spark'tan NoSQL için API ile tümleştirmeyi göstermek için Azure Databricks ve jupyter not defteri kullanılır. Bu öğretici, Python ve Scala'ya odaklanır ancak Spark tarafından desteklenen herhangi bir dili veya arabirimi kullanabilirsiniz.
Bu eğitimde şunları öğreniyorsunuz:
- Spark ve Jupyter not defteri kullanarak NoSQL hesabı için BIR API'ye bağlanın.
- Veritabanı ve kapsayıcı kaynakları oluşturma.
- Kapsayıcıya veri alma.
- Kapsayıcıdaki verileri sorgulama.
- Kapsayıcıdaki öğelerde yaygın işlemler gerçekleştirme.
Prerequisites
- NoSQL hesabı için mevcut bir Azure Cosmos DB.
- Mevcut bir Azure aboneliğiniz varsa yeni bir hesap oluşturun.
- Mevcut bir Azure Databricks çalışma alanı.
Spark ve Jupyter kullanarak bağlanma
NoSQL için Azure Cosmos DB hesabınıza bağlanmak üzere Apache Spark 3.4.x kullanmaya hazır bir işlem kümesi oluşturmak için mevcut Azure Databricks çalışma alanınızı kullanın.
Azure Databricks çalışma alanınızı açın.
Çalışma alanı arabiriminde yeni bir küme oluşturun. Kümeyi en az şu ayarlarla yapılandırın:
Version Value Çalıştırma zamanı sürümü 13.3 LTS (Scala 2.12, Spark 3.4.1) Maven Central'danGrup Kimliği ile Maven paketlerini aramak için çalışma alanı arabirimini
com.azure.cosmos.sparkkullanın. Spark 3.4 için özel olarak, bir Yapıt Kimliği ileazure-cosmos-spark_3-4ön ekiyle kümeye paketi yükleyin.Son olarak yeni bir not defteri oluşturun.
Tip
Varsayılan olarak, not defteri son oluşturulan kümeye eklenir.
Not defteri içinde, NoSQL hesap uç noktası, veritabanı adı ve kapsayıcı adı için çevrimiçi işlem işleme (OLTP) yapılandırma ayarlarını yapın.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Bir veritabanı ve kapsayıcı oluşturun
Veritabanları ve kapsayıcılar gibi hesap kaynaklarını yönetmek için Katalog API'sini kullanın. Ardından OLTP kullanarak kapsayıcı kaynaklarındaki verileri yönetebilirsiniz.
Spark kullanarak NoSQL kaynaklarının API'sini yönetmek için Katalog API'sini yapılandırın.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))kullanarak
cosmicworksadlıCREATE DATABASE IF NOT EXISTSyeni bir veritabanı oluşturun.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Yeni bir
productsadlı kapsayıcı oluşturmak içinCREATE TABLE IF NOT EXISTSkullanın. Bölüm anahtarı yolunu/categoryolarak ayarladığınızdan ve otomatik ölçeklendirme aktarım hızını saniye başına1000istek birimi (RU) olacak şekilde maksimum aktarım hızını etkinleştirdiğinizden emin olun.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Hiyerarşik bölüm anahtarı yapılandırması kullanarak adlı
employeesbaşka bir kapsayıcı oluşturun. Bölümleme anahtar yolu seti olarak/organization,/departmentve/teamkullanın. O belirli sırayı izleyin. Ayrıca aktarım hızını400RU manüel bir sayıya ayarlayın.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))NoSQL hesabı için API'nizde veritabanınızın ve kapsayıcılarınızın oluşturulduğunu doğrulamak için not defteri hücrelerini çalıştırın.
Veri içe aktarma
Örnek bir veri kümesi oluşturun. Ardından OLTP kullanarak bu verileri NoSQL kapsayıcısı için API'ye alın.
Örnek bir veri kümesi oluşturun.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Örnek veriler eklemek için
spark.createDataFrameve daha önce kaydedilmiş OLTP yapılandırmasını kullanarak hedef kapsayıcıya ekleyin.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Sorgu verileri
Veriler üzerinde yaygın sorgular gerçekleştirmek için OLTP verilerini bir veri çerçevesine yükleyin. Verileri filtrelemek veya sorgulamak için çeşitli söz dizimleri kullanabilirsiniz.
OLTP verilerini bir veri çerçevesi nesnesine yüklemek için kullanın
spark.read. Bu öğreticinin önceki bölümlerinde kullandığınız yapılandırmayı kullanın. Ayrıca, Spark bağlayıcısının var olan öğeleri örnekleme yoluyla şemayı çıkarmasına izin vermek içinspark.cosmos.read.inferSchema.enabledöğesinitrueolarak ayarlayın.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()printSchemakullanarak veri çerçevesinde yüklenen verilerin şemasını görüntüleyin.# Render schema df.printSchema()// Render schema df.printSchema()quantitysütununun değeri20'den küçük olan veri satırlarını işleyin. Bu sorguyu gerçekleştirmek içinwhereveshowişlevlerini kullanın.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Sütunun olduğu
clearanceilk veri satırını işlertrue. Bu sorguyufiltergerçekleştirmek için işlevini kullanın.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Filtre veya kesme olmadan beş veri satırı işleyin.
showİşlenen satırların görünümünü ve sayısını özelleştirmek için işlevini kullanın.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Bu ham NoSQL sorgu dizesini kullanarak verilerinizi sorgula:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Yaygın işlemler gerçekleştirme
Spark'ta NoSQL verileri için API ile çalışırken kısmi güncelleştirmeler gerçekleştirebilir veya verilerle ham JSON olarak çalışabilirsiniz.
Öğenin kısmi güncelleştirmesini gerçekleştirmek için:
Mevcut
configyapılandırma değişkenini kopyalayın ve yeni kopyadaki özellikleri değiştirin. Yazma stratejisini özellikleItemPatcholarak yapılandırın. Ardından toplu desteği devre dışı bırakın. Sütunları ve eşlenmiş işlemleri ayarlayın. Son olarak, varsayılan işlem türünü olarakSetayarlayın.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Bu düzeltme eki işleminin bir parçası olarak hedeflemek istediğiniz öğe bölüm anahtarı ve benzersiz tanımlayıcı için değişkenler oluşturun.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Hedef öğeyi belirtmek ve değiştirilmesi gereken alanları belirtmek için bir dizi düzeltme eki nesnesi oluşturun.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Düzeltme eki nesneleri kümesini kullanarak bir veri çerçevesi oluşturun. Düzeltme eki işlemini gerçekleştirmek için kullanın
write.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Düzeltme eki işleminin sonuçlarını gözden geçirmek için bir sorgu çalıştırın. Öğe artık başka bir değişiklik olmadan adlandırılmalıdır
Yamba New Surfboard.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Ham JSON verileriyle çalışmak için:
Mevcut
configyapılandırma değişkenini kopyalayın ve yeni kopyadaki özellikleri değiştirin. Özellikle hedef kapsayıcıyı olarakemployeesdeğiştirin. Ardından sütunu/alanı ham JSON verilerini kullanacak şekilde yapılandırıncontacts.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Kapsayıcıya aktarmak için bir grup çalışan oluşturun.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Bir veri çerçevesi oluşturun ve çalışan verilerini almak için kullanın
write.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Veri çerçevesindeki verileri
showkullanarak işleyin. Çıktıdacontactssütununun ham JSON formatında olduğunu gözlemleyin.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
İlgili içerik
- Apache Spark
- Azure Cosmos DB Katalog API'si
- Yapılandırma parametrelerine referans
- Azure Cosmos DB Spark bağlayıcı örnekleri
- Spark 2.4'ten Spark 3'e geçiş.*
- Kullanım dışı sürümler:
- Artık Azure Databricks, Azure Synapse veya Azure HDInsight'ta desteklenen Spark 3.1 veya 3.2 çalışma zamanları olmadığından Spark 3.1 ve 3.2 için Azure Cosmos DB Spark Bağlayıcısı kullanım dışı bırakılmıştır.
- Spark 3.1'den güncelleştirme için Geçiş Kılavuzu
- Spark 3.2'den güncelleştirmeye geçiş kılavuzu
- Sürüm uyumluluğu:
- Sürüm notları:
- İndirme bağlantıları: