將一對少關聯式資料移轉至 Azure Cosmos DB for NoSQL 帳戶

適用於:NoSQL

若要從關聯式資料庫移轉至 Azure Cosmos DB for NoSQL,可能有必要對資料模型進行變更以獲得最佳化。

其中一個常見的轉換是在一個 JSON 文件內內嵌相關的子項目,藉此反正規化資料。 在這裡,我們將使用 Azure Data Factory 或 Azure Databricks 來查看一些選項。 如需 Azure Cosmos DB 資料模型的詳細資訊,請參閱 Azure Cosmos DB 中的資料模型

範例案例

假設我們的 SQL 資料庫有 Orders 和 OrderDetails 這兩個資料表。

Screenshot that shows the Orders and OrderDetails tables in the SQL database.

在移轉期間,我們想要將此一對多的關聯性合併到一個 JSON 文件。 若要建立單一文件,請使用 FOR JSON 來建立 T-SQL 查詢:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

此查詢的結果將會包括來自 Orders 資料表的資料:

Screenshot of a query that results in details of various orders.

在理想的情況下,您會想要使用單一 Azure Data Factory (ADF) 複製活動,以查詢 SQL 資料作為來源,並將輸出直接寫入 Azure Cosmos DB 接收作為正確的 JSON 物件。 目前,無法在一個複製活動中執行所需的 JSON 轉換。 如果我們嘗試將以上查詢的結果複製到 Azure Cosmos DB for NoSQL 容器,則會看到 OrderDetails 欄位是文件的字串屬性,而不是預期的 JSON 陣列。

我們可以透過下列其中一種方式解決目前的限制:

  • 使用 Azure Data Factory 搭配兩個複製活動
    1. 讓 SQL 中的 JSON 格式資料到達中繼 Blob 儲存位置中的文字檔
    2. 從 JSON 文字檔載入資料至 Azure Cosmos DB 中的容器。
  • 使用 Azure Databricks 從 SQL 讀取並寫入 Azure Cosmos DB - 我們會在這裡提供兩個選項。

讓我們更詳細地查看這些方法:

Azure Data Factory

雖然我們無法在目的地 Azure Cosmos DB 文件中將 OrderDetails 內嵌為 JSON 陣列,但可以使用兩個不同的複製活動來解決該問題。

複製活動 #1:SqlJsonToBlobText

針對來源資料,我們會使用 SQL 查詢,利用 SQL Server OPENJSON 和 FOR JSON PATH 功能,將結果集取得為單一資料行且每個資料列有一個 JSON 物件 (代表訂單):

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

Screenshot of the preview values in the ADF copy operation.

針對 SqlJsonToBlobText 複製活動的接收器,我們選擇 [分隔符號文字],並將其指向 Azure Blob 儲存體中的特定資料夾。 此接收器包括動態產生的唯一檔案名稱 (例如,@concat(pipeline().RunId,'.json'))。 因為我們的文字檔不是真正的「分隔」,而且不想要使用逗號來將其剖析成個別的資料行。 我們也想要保留雙引號 (")、將 [資料行分隔符號] 設定為 Tab ("\t") 或另一個不會出現在資料中的字元,並將 [引號字元] 設定為 [無引號字元]。

Screenshot that highlights the Column delimiter and Quote character settings.

複製活動 #2:BlobJsonToCosmos

接下來,我們會修改 ADF 管線,方法是新增第二個複製活動,以在 Azure Blob 儲存體中尋找第一個活動所建立的文字檔。 系統會將其處理為 "JSON" 來源,以針對在文字檔案中找到的每個 JSON 資料列,以單一文件形式插入 Azure Cosmos DB 接收器。

Screenshot that highlights the JSON source file and the File path fields.

此外,我們也可以在管線中新增「刪除」活動,以便在每次執行之前,先刪除 /Orders/ 資料夾中剩餘的所有舊檔案。 ADF 管線現在看起來像這樣:

Screenshot that highlights the Delete activity.

觸發先前所提及的管線之後,我們會看到在中繼 Azure Blob 儲存體位置中所建立的檔案,而且一個資料列包含一個 JSON 物件:

Screenshot that shows the created file that contains the JSON objects.

我們也會看到具有適當內嵌 OrderDetails 的 Orders 文件插入我們的 Azure Cosmos DB 集合:

Screenshot that shows the order details as a part of the Azure Cosmos DB document

Azure Databricks

我們也可以在 Azure Databricks 中使用 Spark,將資料從我們的 SQL Database 來源複製到 Azure Cosmos DB 目的地,而不需要在 Azure Blob 儲存體中建立中繼文字/JSON 檔案。

注意

為了清楚和簡單起見,程式碼片段包括明確內嵌的虛擬資料庫密碼,但您最好應該使用 Azure Databricks 祕密。

首先,我們會建立並將必要的 SQL 連接器Azure Cosmos DB 連接器程式庫連結至我們的 Azure Databricks 叢集。 重新啟動叢集,以確定已載入程式庫。

Screenshot that shows where to create and attach the required SQL connector and Azure Cosmos DB connector libraries to our Azure Databricks cluster.

接下來,我們會針對 Scala 和 Python 呈現兩個範例。

Scala

在這裡,我們會取得 SQL 查詢的結果,其將 "FOR JSON" 輸出轉換成資料框架:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Screenshot that shows the SQL query output in a DataFrame.

接下來,我們會連線到 Azure Cosmos DB 資料庫與集合:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

最後,我們會定義結構描述,並使用 from_json 來套用 DataFrame,之後才將其儲存至 CosmosDB 集合。

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Screenshot that highlights the proper array for saving to an Azure Cosmos DB collection.

Python

另一個方法是,您可能需要在 Spark 中執行 JSON 轉換 (如果來源資料庫不支援 FOR JSON 或類似的作業)。 或者,您可以針對大型資料集使用平行作業。 我們在此提供 PySpark 範例。 從在第一個資料格中設定來源和目標資料庫連接開始:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

然後,我們會查詢來源資料庫 (在此情況下為 SQL Server) 以取得訂單和訂單詳細資料記錄,並將結果放入 Spark 資料框架中。 我們也會建立包含所有訂單識別碼的清單,以及用於平行作業的執行緒集區:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

然後,建立將訂單寫入目標 API for NoSQL 集合的函式。 此函式將篩選指定訂單識別碼的所有訂單詳細資料,並將其轉換為 JSON 陣列。 然後,將 JSON 文件寫入至該訂單的目標 API for NoSQL 容器:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://learn.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

最後,我們會在執行緒集區上使用 map 函數來呼叫 Python writeOrder 函數以平行執行,並傳入我們稍早建立的訂單識別碼清單:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

在任一方法中,最終我們都應該會在 Azure Cosmos DB 集合的每個 Order 文件內取得正確儲存的內嵌 OrderDetails:

Screenshot of the resulting data after migration.

下一步