Udostępnij za pośrednictwem


Kopiowanie danych z usługi Azure Cosmos DB do dedykowanej puli SQL przy użyciu platformy Apache Spark

Azure Synapse Link dla usługi Azure Cosmos DB umożliwia użytkownikom uruchamianie analizy niemal w czasie rzeczywistym na danych operacyjnych w usłudze Azure Cosmos DB. Jednak czasami niektóre dane muszą być agregowane i wzbogacone, aby obsługiwać użytkowników magazynu danych. Można wykonać curating and export Azure Synapse Link data with just a kilka komórek w notesie.

Wymagania wstępne

Kroki

W tym samouczku połączysz się z magazynem analitycznym, aby nie mieć wpływu na magazyn transakcyjny (nie będzie korzystać z żadnych jednostek żądań). Wykonamy następujące kroki:

  1. Odczytywanie kontenera HTAP usługi Azure Cosmos DB w ramce danych Platformy Spark
  2. Agregowanie wyników w nowej ramce danych
  3. Pozyskiwanie danych do dedykowanej puli SQL

Spark to SQL Steps 1

Dane

W tym przykładzie używamy kontenera HTAP o nazwie RetailSales. Jest to część połączonej usługi o nazwie ConnectedData i ma następujący schemat:

  • _rid: ciąg (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: podwójne (nullable = true)
  • productCode: ciąg (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: ciąg (nullable = true)
  • reklama: długa (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: ciąg (nullable = true)

Zagregujemy sprzedaż (ilość, przychód (cena x ilość) według productCode i weekStarting na potrzeby raportowania . Na koniec wyeksportujemy te dane do dedykowanej tabeli puli SQL o nazwie dbo.productsales.

Konfigurowanie notesu platformy Spark

Utwórz notes Spark ze scala jako platformą Spark (Scala) jako językiem głównym. Używamy domyślnego ustawienia notesu dla sesji.

Odczytywanie danych na platformie Spark

Przeczytaj kontener HTAP usługi Azure Cosmos DB z platformą Spark w ramce danych w pierwszej komórce.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregowanie wyników w nowej ramce danych

W drugiej komórce uruchamiamy transformację i agregujemy potrzebne dla nowej ramki danych przed załadowaniem jej do dedykowanej bazy danych puli SQL.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Ładowanie wyników do dedykowanej puli SQL

W trzeciej komórce załadujemy dane do dedykowanej puli SQL. Spowoduje to automatyczne utworzenie tymczasowej tabeli zewnętrznej, zewnętrznego źródła danych i formatu pliku zewnętrznego, który zostanie usunięty po zakończeniu zadania.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Wykonywanie zapytań względem wyników przy użyciu języka SQL

Możesz wykonać zapytanie dotyczące wyniku przy użyciu prostego zapytania SQL, takiego jak następujący skrypt SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Zapytanie przedstawi następujące wyniki w trybie wykresu: Spark to SQL Steps 2

Następne kroki