Apache Spark kullanarak Azure Cosmos DB'den ayrılmış bir SQL havuzuna veri kopyalama
Azure Cosmos DB için Azure Synapse Bağlantısı, kullanıcıların Azure Cosmos DB'deki işletimsel veriler üzerinde gerçek zamanlıya yakın analizler çalıştırmasına olanak tanır. Ancak bazı verilerin veri ambarı kullanıcılarına hizmet vermek için toplanması ve zenginleştirilmesi gereken zamanlar vardır. Azure Synapse Bağlantı verilerini düzenleme ve dışarı aktarma işlemi, not defterindeki yalnızca birkaç hücreyle yapılabilir.
Önkoşullar
- Synapse çalışma alanını şu şekilde sağlayın :
- Veri içeren bir HTAP kapsayıcısı ile Azure Cosmos DB hesabı sağlama
- Azure Cosmos DB HTAP kapsayıcısını çalışma alanına bağlama
- Spark'tan ayrılmış bir SQL havuzuna veri aktarmak için doğru kuruluma sahip olun
Adımlar
Bu öğreticide analiz deposuna bağlanarak işlem deposunu etkilemeyeceksiniz (herhangi bir İstek Birimi kullanmaz). Aşağıdaki adımları uygulayacağız:
- Azure Cosmos DB HTAP kapsayıcısını Spark veri çerçevesine okuma
- Sonuçları yeni bir veri çerçevesinde toplama
- Verileri ayrılmış bir SQL havuzuna alma
Veriler
Bu örnekte RetailSales adlı bir HTAP kapsayıcısı kullanıyoruz. ConnectedData adlı bağlı hizmetin bir parçasıdır ve aşağıdaki şemaya sahiptir:
- _rid: dize (null atanabilir = true)
- _ts: long (null atanabilir = true)
- logQuantity: double (null atanabilir = true)
- productCode: dize (null atanabilir = true)
- quantity: long (null atanabilir = true)
- price: long (nullable = true)
- id: dize (null atanabilir = true)
- advertising: long (nullable = true)
- storeId: long (null atanabilir = true)
- weekStarting: long (nullable = true)
- _etag: dize (null atanabilir = true)
Raporlama amacıyla satışları (miktar, gelir (fiyat x miktar) productCode ve weekStarting'e göre toplayacağız. Son olarak, bu verileri adlı dbo.productsales
ayrılmış bir SQL havuzu tablosuna aktaracağız.
Spark Not Defteri yapılandırma
Ana dil olarak Scala'nın Spark (Scala) olduğu bir Spark not defteri oluşturun. Oturum için not defterinin varsayılan ayarını kullanırız.
Spark'ta verileri okuma
Spark içeren Azure Cosmos DB HTAP kapsayıcısını ilk hücredeki bir veri çerçevesine okuyun.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Sonuçları yeni bir veri çerçevesinde toplama
İkinci hücrede, yeni veri çerçevesi için gereken dönüştürmeyi ve toplamaları ayrılmış bir SQL havuzu veritabanına yüklemeden önce çalıştırırız.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Sonuçları ayrılmış bir SQL havuzuna yükleme
Üçüncü hücrede verileri ayrılmış bir SQL havuzuna yükleriz. İş tamamlandıktan sonra silinecek geçici bir dış tablo, dış veri kaynağı ve dış dosya biçimi otomatik olarak oluşturulur.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
SQL ile sonuçları sorgulama
Aşağıdaki SQL betiği gibi basit bir SQL sorgusu kullanarak sonucu sorgulayabilirsiniz:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Sorgunuz grafik modunda şu sonuçları sunar:
Sonraki adımlar
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin