Apache Spark kullanarak Azure Cosmos DB'den ayrılmış bir SQL havuzuna veri kopyalama

Azure Cosmos DB için Azure Synapse Bağlantısı, kullanıcıların Azure Cosmos DB'deki işletimsel veriler üzerinde gerçek zamanlıya yakın analizler çalıştırmasına olanak tanır. Ancak bazı verilerin veri ambarı kullanıcılarına hizmet vermek için toplanması ve zenginleştirilmesi gereken zamanlar vardır. Azure Synapse Bağlantı verilerini düzenleme ve dışarı aktarma işlemi, not defterindeki yalnızca birkaç hücreyle yapılabilir.

Önkoşullar

Adımlar

Bu öğreticide analiz deposuna bağlanarak işlem deposunu etkilemeyeceksiniz (herhangi bir İstek Birimi kullanmaz). Aşağıdaki adımları uygulayacağız:

  1. Azure Cosmos DB HTAP kapsayıcısını Spark veri çerçevesine okuma
  2. Sonuçları yeni bir veri çerçevesinde toplama
  3. Verileri ayrılmış bir SQL havuzuna alma

Spark'ta SQL'e Adım 1

Veriler

Bu örnekte RetailSales adlı bir HTAP kapsayıcısı kullanıyoruz. ConnectedData adlı bağlı hizmetin bir parçasıdır ve aşağıdaki şemaya sahiptir:

  • _rid: dize (null atanabilir = true)
  • _ts: long (null atanabilir = true)
  • logQuantity: double (null atanabilir = true)
  • productCode: dize (null atanabilir = true)
  • quantity: long (null atanabilir = true)
  • price: long (nullable = true)
  • id: dize (null atanabilir = true)
  • advertising: long (nullable = true)
  • storeId: long (null atanabilir = true)
  • weekStarting: long (nullable = true)
  • _etag: dize (null atanabilir = true)

Raporlama amacıyla satışları (miktar, gelir (fiyat x miktar) productCode ve weekStarting'e göre toplayacağız. Son olarak, bu verileri adlı dbo.productsalesayrılmış bir SQL havuzu tablosuna aktaracağız.

Spark Not Defteri yapılandırma

Ana dil olarak Scala'nın Spark (Scala) olduğu bir Spark not defteri oluşturun. Oturum için not defterinin varsayılan ayarını kullanırız.

Spark'ta verileri okuma

Spark içeren Azure Cosmos DB HTAP kapsayıcısını ilk hücredeki bir veri çerçevesine okuyun.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Sonuçları yeni bir veri çerçevesinde toplama

İkinci hücrede, yeni veri çerçevesi için gereken dönüştürmeyi ve toplamaları ayrılmış bir SQL havuzu veritabanına yüklemeden önce çalıştırırız.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Sonuçları ayrılmış bir SQL havuzuna yükleme

Üçüncü hücrede verileri ayrılmış bir SQL havuzuna yükleriz. İş tamamlandıktan sonra silinecek geçici bir dış tablo, dış veri kaynağı ve dış dosya biçimi otomatik olarak oluşturulur.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

SQL ile sonuçları sorgulama

Aşağıdaki SQL betiği gibi basit bir SQL sorgusu kullanarak sonucu sorgulayabilirsiniz:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Sorgunuz grafik modunda aşağıdaki sonuçları sunar: Spark'ı SQL'e Taşıma 2. Adım

Sonraki adımlar