Sdílet prostřednictvím


Migrace relačních dat 1:1 do účtu Azure Cosmos DB for NoSQL

PLATÍ PRO: NoSQL

Aby bylo možné migrovat z relační databáze do Azure Cosmos DB for NoSQL, může být nutné provést změny datového modelu pro optimalizaci.

Jednou z běžných transformací je denormalizace dat vložením souvisejících dílčích položek do jednoho dokumentu JSON. Tady se podíváme na několik možností použití Azure Data Factory nebo Azure Databricks. Další informace o modelování dat pro službu Azure Cosmos DB najdete v tématu Modelování dat ve službě Azure Cosmos DB.

Ukázkový scénář

Předpokládejme, že máme v databázi SQL následující dvě tabulky, Orders a OrderDetails.

Snímek obrazovky znázorňující tabulky Orders a OrderDetails v databázi SQL

Během migrace chceme tento vztah 1:0 zkombinovat do jednoho dokumentu JSON. Pokud chcete vytvořit jeden dokument, vytvořte dotaz T-SQL pomocí FOR JSON:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

Výsledky tohoto dotazu by zahrnovaly data z tabulky Orders :

Snímek obrazovky s dotazem, který má za následek podrobnosti o různých objednávkách

V ideálním případě chcete použít jednu aktivitu kopírování azure Data Factory (ADF) k dotazování dat SQL jako zdroje a zápis výstupu přímo do jímky Azure Cosmos DB jako správných objektů JSON. V současné době není možné provést potřebnou transformaci JSON v jedné aktivitě kopírování. Pokud se pokusíme zkopírovat výsledky výše uvedeného dotazu do kontejneru Azure Cosmos DB for NoSQL, zobrazí se pole OrderDetails jako řetězcová vlastnost našeho dokumentu místo očekávaného pole JSON.

Toto aktuální omezení můžeme obejít jedním z následujících způsobů:

  • Použití služby Azure Data Factory se dvěma aktivitami kopírování:
    1. Získání dat ve formátu JSON z SQL do textového souboru v umístění zprostředkujícího úložiště objektů blob
    2. Načtěte data z textového souboru JSON do kontejneru ve službě Azure Cosmos DB.
  • Azure Databricks slouží ke čtení z SQL a zápisu do služby Azure Cosmos DB – tady jsou dvě možnosti.

Pojďme se podrobněji podívat na tyto přístupy:

Azure Data Factory

I když do cílového dokumentu Azure Cosmos DB nemůžeme vložit OrderDetails jako pole JSON, můžeme problém obejít pomocí dvou samostatných aktivit kopírování.

Aktivita kopírování č. 1: SqlJsonToBlobText

Pro zdrojová data pomocí dotazu SQL získáme sadu výsledků jako jeden sloupec s jedním objektem JSON (představujícím objednávku) na řádek pomocí funkcí SQL Server OPENJSON a FOR JSON PATH:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

Snímek obrazovky s hodnotami náhledu v operaci kopírování ADF

U jímky SqlJsonToBlobText aktivity kopírování zvolíme text s oddělovači a nasměrujeme ho na konkrétní složku ve službě Azure Blob Storage. Tato jímka obsahuje dynamicky vygenerovaný jedinečný název souboru (například @concat(pipeline().RunId,'.json'). Vzhledem k tomu, že textový soubor není ve skutečnosti "oddělený" a nechceme, aby se parsoval do samostatných sloupců pomocí čárek. Chceme také zachovat dvojité uvozovky ("), nastavit "Oddělovač sloupců" na tabulátor ("\t") nebo jiný znak, ke kterému v datech nedochází, a pak nastavit znak uvozovky na "Žádný znak uvozovky".

Snímek obrazovky, který zvýrazní nastavení oddělovače sloupců a uvozovek

Aktivita kopírování č. 2: BlobJsonToCosmos

Dále upravíme kanál ADF přidáním druhé aktivity kopírování, která hledá ve službě Azure Blob Storage textový soubor vytvořený první aktivitou. Zpracovává ho jako zdroj JSON pro vložení do jímky Azure Cosmos DB jako jeden dokument na řádek JSON nalezený v textovém souboru.

Snímek obrazovky se zvýrazněnou zdrojovým souborem JSON a poli Cesta k souboru

Volitelně také do kanálu přidáme aktivitu "Odstranit", aby před každým spuštěním odstranila všechny předchozí soubory zbývající ve složce /Orders/. Náš kanál ADF teď vypadá nějak takto:

Snímek obrazovky se zvýrazněnou aktivitou Odstranění

Po aktivaci dříve zmíněného kanálu uvidíme soubor vytvořený v našem zprostředkujícím umístění služby Azure Blob Storage obsahující jeden objekt JSON na řádek:

Snímek obrazovky znázorňující vytvořený soubor obsahující objekty JSON

Uvidíme také dokumenty Orders s správně vloženými orderDetails vloženými do naší kolekce Azure Cosmos DB:

Snímek obrazovky znázorňující podrobnosti objednávky jako součást dokumentu Azure Cosmos DB

Azure Databricks

Spark v Azure Databricks můžeme také použít ke zkopírování dat ze zdroje služby SQL Database do cíle služby Azure Cosmos DB bez vytvoření zprostředkujících textových souborů a souborů JSON ve službě Azure Blob Storage.

Poznámka:

Z důvodu srozumitelnosti a jednoduchosti fragmenty kódu obsahují fiktivní hesla databáze explicitně vložená, ale v ideálním případě byste měli používat tajné kódy Azure Databricks.

Nejprve vytvoříme a připojíme požadovaný konektor SQL a knihovny konektorů Azure Cosmos DB k našemu clusteru Azure Databricks. Restartujte cluster, aby se zajistilo načtení knihoven.

Snímek obrazovky, který ukazuje, kde vytvořit a připojit požadovaný konektor SQL a knihovny konektorů Azure Cosmos DB k našemu clusteru Azure Databricks

Dále prezentujeme dvě ukázky pro Scala a Python.

Scala

Tady získáme výsledky dotazu SQL s výstupem FOR JSON do datového rámce:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Snímek obrazovky znázorňující výstup dotazu SQL v datovém rámci

Dále se připojíme k databázi a kolekci Azure Cosmos DB:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

Nakonec definujeme schéma a použijeme from_json k použití datového rámce před uložením do kolekce Cosmos DB.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Snímek obrazovky se zvýrazněným správným polem pro uložení do kolekce Azure Cosmos DB

Python

Jako alternativní přístup možná budete muset ve Sparku spouštět transformace JSON, pokud zdrojová databáze nepodporuje FOR JSON nebo podobnou operaci. Alternativně můžete použít paralelní operace pro velkou datovou sadu. Zde prezentujeme ukázku PySpark. Začněte konfigurací připojení ke zdrojové a cílové databázi v první buňce:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

Potom se dotazujeme na zdrojová databáze (v tomto případě SQL Server) pro záznamy objednávek i podrobností objednávek a výsledky vložíme do datových rámců Sparku. Vytvoříme také seznam obsahující všechna ID objednávek a fond vláken pro paralelní operace:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

Pak vytvořte funkci pro zápis objednávek do cílové kolekce API pro NoSQL. Tato funkce vyfiltruje všechny podrobnosti objednávky pro dané ID objednávky, převede je na pole JSON a vloží pole do dokumentu JSON. Dokument JSON se pak zapíše do cílového kontejneru API pro NoSQL pro danou objednávku:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://learn.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

Nakonec zavoláme funkci Pythonu writeOrder pomocí mapové funkce ve fondu vláken, která se provede paralelně a předá seznam ID objednávek, které jsme vytvořili dříve:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

V obou přístupech bychom na konci měli správně uložit vložené OrderDetails v rámci každého dokumentu objednávky v kolekci Azure Cosmos DB:

Snímek obrazovky s výslednými daty po migraci

Další kroky