Копирование данных из Azure Cosmos DB в выделенный пул SQL с помощью Apache Spark

С помощью Azure Synapse Link для Azure Cosmos DB пользователи могут запускать аналитику почти в реальном времени по операционным данным в Azure Cosmos DB. Однако в некоторых ситуациях определенные данные необходимо агрегировать и обогащать для обслуживания пользователей хранилища данных. Для курирования и экспорта данных Azure Synapse Link достаточно всего нескольких ячеек в записной книжке.

Предварительные требования

Шаги

В этом руководстве показано, как подключиться к аналитическому хранилищу. Описанные здесь действия не влияют на хранилище транзакций (для их выполнения единицы запросов не потребляются). Мы выполним следующие действия:

  1. Считывание контейнера HTAP Cosmos DB в кадр данных Spark
  2. Агрегирование результатов в новый кадр данных
  3. Прием данных в выделенный пул SQL

Spark to SQL Steps 1

Данные

В этом примере мы используем контейнер HTAP с именем RetailSales. Это часть связанной службы с именем ConnectedData со следующей схемой:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

Для отчетности мы будем агрегировать продажи (quantity, revenue (price x quantity) по productCode и weekStarting. Наконец, мы экспортируем эти данные в таблицу выделенного пула SQL с именем dbo.productsales.

Настройка записной книжки Spark

Создайте записную книжку Spark, используя в качестве основного языка с Scala на Spark (Scala). Мы используем для сеанса заданный по умолчанию параметр записной книжки.

Чтение данных в Spark

С помощью Spark считайте контейнер HTAP Cosmos DB в кадр данных в первой ячейке.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Агрегирование результатов в новый кадр данных

Во второй ячейке будут выполнятся преобразование и статистические вычисления, которые необходимо выполнить для нового кадра данных до его загрузки в базу данных выделенного пула SQL.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Загрузка результатов в выделенный пул SQL

В третьей ячейке данные загружаются в выделенный пул SQL. При этом автоматически создается временная внешняя таблица, внешний источник данных и формат внешнего файла, которые будут удалены после завершения задания.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Запрос результатов с помощью SQL

Вы можете запросить результат, используя простой SQL-запрос, например так, как в следующем скрипте SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

В ответ на запрос будут представлены следующие результаты в режиме диаграммы: Spark to SQL Steps 2

Дальнейшие шаги