共用方式為


將一對少關聯式資料移轉至 Azure Cosmos DB for NoSQL 帳戶

適用於:NoSQL

若要從關聯式資料庫移轉至 Azure Cosmos DB for NoSQL,可能有必要對資料模型進行變更以獲得最佳化。

其中一個常見的轉換是在一個 JSON 文件內內嵌相關的子項目,藉此反正規化資料。 在這裡,我們將使用 Azure Data Factory 或 Azure Databricks 來查看一些選項。 如需 Azure Cosmos DB 資料模型的詳細資訊,請參閱 Azure Cosmos DB 中的資料模型

範例案例

假設我們的 SQL 資料庫有 Orders 和 OrderDetails 這兩個資料表。

顯示 SQL 資料庫中 Orders 和 OrderDetails 數據表的螢幕快照。

在移轉期間,我們想要將此一對多的關聯性合併到一個 JSON 文件。 若要建立單一文件,請使用 FOR JSON 來建立 T-SQL 查詢:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

此查詢的結果將會包括來自 Orders 資料表的資料:

產生各種訂單詳細數據的查詢螢幕快照。

在理想的情況下,您會想要使用單一 Azure Data Factory (ADF) 複製活動,以查詢 SQL 資料作為來源,並將輸出直接寫入 Azure Cosmos DB 接收作為正確的 JSON 物件。 目前,無法在一個複製活動中執行所需的 JSON 轉換。 如果我們嘗試將以上查詢的結果複製到 Azure Cosmos DB for NoSQL 容器,則會看到 OrderDetails 欄位是文件的字串屬性,而不是預期的 JSON 陣列。

我們可以透過下列其中一種方式解決目前的限制:

  • 使用 Azure Data Factory 搭配兩個複製活動
    1. 讓 SQL 中的 JSON 格式資料到達中繼 Blob 儲存位置中的文字檔
    2. 從 JSON 文字檔載入資料至 Azure Cosmos DB 中的容器。
  • 使用 Azure Databricks 從 SQL 讀取並寫入 Azure Cosmos DB - 我們會在這裡提供兩個選項。

讓我們更詳細地查看這些方法:

Azure Data Factory

雖然我們無法在目的地 Azure Cosmos DB 文件中將 OrderDetails 內嵌為 JSON 陣列,但可以使用兩個不同的複製活動來解決該問題。

複製活動 #1:SqlJsonToBlobText

針對來源資料,我們會使用 SQL 查詢,利用 SQL Server OPENJSON 和 FOR JSON PATH 功能,將結果集取得為單一資料行且每個資料列有一個 JSON 物件 (代表訂單):

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

ADF 複製作業中預覽值的螢幕快照。

針對 SqlJsonToBlobText 複製活動的接收器,我們選擇 [分隔符號文字],並將其指向 Azure Blob 儲存體中的特定資料夾。 此接收器包括動態產生的唯一檔案名稱 (例如,@concat(pipeline().RunId,'.json'))。 因為我們的文字檔不是真正的「分隔」,而且不想要使用逗號來將其剖析成個別的資料行。 我們也想要保留雙引號 (")、將 [資料行分隔符號] 設定為 Tab ("\t") 或另一個不會出現在資料中的字元,並將 [引號字元] 設定為 [無引號字元]。

醒目提示數據行分隔符和引號字元設定的螢幕快照。

複製活動 #2:BlobJsonToCosmos

接下來,我們會修改 ADF 管線,方法是新增第二個複製活動,以在 Azure Blob 儲存體中尋找第一個活動所建立的文字檔。 系統會將其處理為 "JSON" 來源,以針對在文字檔案中找到的每個 JSON 資料列,以單一文件形式插入 Azure Cosmos DB 接收器。

醒目提示 JSON 來源檔案和 [檔案路徑] 欄位的螢幕快照。

此外,我們也可以在管線中新增「刪除」活動,以便在每次執行之前,先刪除 /Orders/ 資料夾中剩餘的所有舊檔案。 ADF 管線現在看起來像這樣:

醒目提示 [刪除] 活動的螢幕快照。

觸發先前所提及的管線之後,我們會看到在中繼 Azure Blob 儲存體位置中所建立的檔案,而且一個資料列包含一個 JSON 物件:

顯示包含 JSON 物件的已建立檔案的螢幕快照。

我們也會看到具有適當內嵌 OrderDetails 的 Orders 文件插入我們的 Azure Cosmos DB 集合:

顯示訂單詳細數據作為 Azure Cosmos DB 檔案的螢幕快照

Azure Databricks

我們也可以在 Azure Databricks 中使用 Spark,將資料從我們的 SQL Database 來源複製到 Azure Cosmos DB 目的地,而不需要在 Azure Blob 儲存體中建立中繼文字/JSON 檔案。

注意

為了清楚和簡單起見,程式碼片段包括明確內嵌的虛擬資料庫密碼,但您最好應該使用 Azure Databricks 祕密。

首先,我們會建立並將必要的 SQL 連接器Azure Cosmos DB 連接器程式庫連結至我們的 Azure Databricks 叢集。 重新啟動叢集,以確定已載入程式庫。

此螢幕快照顯示建立必要 SQL 連接器和 Azure Cosmos DB 連接器連結庫的位置,並將其連結至我們的 Azure Databricks 叢集。

接下來,我們會針對 Scala 和 Python 呈現兩個範例。

Scala

在這裡,我們會取得 SQL 查詢的結果,其將 "FOR JSON" 輸出轉換成資料框架:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

顯示 DataFrame 中 SQL 查詢輸出的螢幕快照。

接下來,我們會連線到 Azure Cosmos DB 資料庫與集合:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

最後,我們會定義結構描述,並使用 from_json 來套用 DataFrame,之後才將其儲存至 CosmosDB 集合。

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

醒目提示儲存至 Azure Cosmos DB 集合的適當陣列螢幕快照。

Python

另一個方法是,您可能需要在 Spark 中執行 JSON 轉換 (如果來源資料庫不支援 FOR JSON 或類似的作業)。 或者,您可以針對大型資料集使用平行作業。 我們在此提供 PySpark 範例。 從在第一個資料格中設定來源和目標資料庫連接開始:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

然後,我們會查詢來源資料庫 (在此情況下為 SQL Server) 以取得訂單和訂單詳細資料記錄,並將結果放入 Spark 資料框架中。 我們也會建立包含所有訂單識別碼的清單,以及用於平行作業的執行緒集區:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

然後,建立將訂單寫入目標 API for NoSQL 集合的函式。 此函式將篩選指定訂單識別碼的所有訂單詳細資料,並將其轉換為 JSON 陣列。 然後,將 JSON 文件寫入至該訂單的目標 API for NoSQL 容器:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://learn.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

最後,我們會在執行緒集區上使用 map 函數來呼叫 Python writeOrder 函數以平行執行,並傳入我們稍早建立的訂單識別碼清單:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

在任一方法中,最終我們都應該會在 Azure Cosmos DB 集合的每個 Order 文件內取得正確儲存的內嵌 OrderDetails:

移轉後產生的數據的螢幕快照。

下一步