Megosztás a következőn keresztül:


Egy-a-néhány relációs adat migrálása egy Azure Cosmos DB for NoSQL-fiókba

A KÖVETKEZŐRE VONATKOZIK: NoSQL

A relációs adatbázisból az Azure Cosmos DB for NoSQL-be való migráláshoz szükség lehet az adatmodell optimalizálására.

Az egyik gyakori átalakítás az adatok denormalizálása a kapcsolódó részhalmazok beágyazásával egy JSON-dokumentumba. Az alábbiakban bemutatunk néhány lehetőséget erre az Azure Data Factory vagy az Azure Databricks használatával. Az Azure Cosmos DB adatmodellezésével kapcsolatos további információkért tekintse meg az Azure Cosmos DB adatmodellezését.

Példaforgatókönyv

Tegyük fel, hogy az SQL-adatbázisunkban az alábbi két tábla található, az Orders és az OrderDetails.

Screenshot that shows the Orders and OrderDetails tables in the SQL database.

Ezt az egy-a-néhányhoz kapcsolatot egy JSON-dokumentumba szeretnénk egyesíteni a migrálás során. Egyetlen dokumentum létrehozásához hozzon létre egy T-SQL-lekérdezést a következő használatával FOR JSON:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

A lekérdezés eredményei a Rendelések táblából származó adatokat tartalmazzák:

Screenshot of a query that results in details of various orders.

Ideális esetben egyetlen Azure Data Factory (ADF) másolási tevékenységet szeretne használni az SQL-adatok forrásként való lekérdezéséhez, és a kimenetet közvetlenül az Azure Cosmos DB fogadóba írja megfelelő JSON-objektumként. Jelenleg nem lehet elvégezni a szükséges JSON-átalakítást egy másolási tevékenységben. Ha a fenti lekérdezés eredményeit egy Azure Cosmos DB for NoSQL-tárolóba próbáljuk másolni, az OrderDetails mező a dokumentum sztringtulajdonságaként jelenik meg a várt JSON-tömb helyett.

Ezt a jelenlegi korlátozást az alábbi módok egyikével háríthatjuk el:

  • Az Azure Data Factory használata két másolási tevékenységgel:
    1. JSON-formátumú adatok lekérése AZ SQL-ből egy szövegfájlba egy közbenső blobtárolóban
    2. Adatok betöltése a JSON-szövegfájlból egy Tárolóba az Azure Cosmos DB-ben.
  • Az Azure Databricks használatával olvashat az SQL-ből, és írhat az Azure Cosmos DB-be – itt két lehetőséget mutatunk be.

Vizsgáljuk meg részletesebben ezeket a megközelítéseket:

Azure Data Factory

Bár az OrderDetails nem ágyazható be JSON-tömbként a cél Azure Cosmos DB-dokumentumba, két különálló másolási tevékenységgel megkerülhetjük a problémát.

Másolási tevékenység #1: SqlJsonToBlobText

A forrásadatokhoz egy SQL-lekérdezést használunk az eredményhalmaz egyetlen oszlopként való lekéréséhez, amely soronként egy JSON-objektummal (a sorrendet képviseli) az SQL Server OPENJSON és a FOR JSON PATH képességeinek használatával:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

Screenshot of the preview values in the ADF copy operation.

A másolási tevékenység fogadójának SqlJsonToBlobText a "Tagolt szöveg" lehetőséget választjuk, és az Azure Blob Storage egy adott mappájára mutatunk. Ez a fogadó tartalmaz egy dinamikusan létrehozott egyedi fájlnevet (például @concat(pipeline().RunId,'.json'). Mivel a szövegfájl nem igazán "tagolt", és nem szeretnénk, hogy vesszővel külön oszlopokba legyen elemezve. A kettős idézőjeleket ("), az "Oszlopelválasztót" tabulátor ("\t") értékre szeretnénk állítani – vagy egy másik, az adatokban nem szereplő karaktert, majd az "Idézőjel" értéket "Nincs idézőjel" értékre állítani.

Screenshot that highlights the Column delimiter and Quote character settings.

Másolási tevékenység #2: BlobJsonToCosmos

Ezután úgy módosítjuk az ADF-folyamatot, hogy hozzáadjuk a második másolási tevékenységet, amely az Első tevékenység által létrehozott szövegfájlhoz az Azure Blob Storage-ban jelenik meg. "JSON" forrásként dolgozza fel, hogy beszúrja az Azure Cosmos DB fogadóba a szövegfájlban található JSON-soronként egy dokumentumként.

Screenshot that highlights the JSON source file and the File path fields.

Ha szükséges, egy "Delete" tevékenységet is hozzáadunk a folyamathoz, így minden futtatás előtt törli a /Orders/ mappában maradt összes korábbi fájlt. Az ADF-folyamat így néz ki:

Screenshot that highlights the Delete activity.

A korábban említett folyamat aktiválása után egy, soronként egy JSON-objektumot tartalmazó fájl jelenik meg a köztes Azure Blob Storage-helyen:

Screenshot that shows the created file that contains the JSON objects.

Az Azure Cosmos DB-gyűjteménybe beszúrt, megfelelően beágyazott OrderDetails rendelési dokumentumokat is láthat:

Screenshot that shows the order details as a part of the Azure Cosmos DB document

Azure Databricks

Az Azure Databricks Spark használatával az SQL Database-forrásból az Azure Cosmos DB-célhelyre másolhatjuk az adatokat anélkül, hogy létrehoznánk a köztes szöveg-/JSON-fájlokat az Azure Blob Storage-ban.

Megjegyzés:

Az egyértelműség és az egyszerűség kedvéért a kódrészletek kifejezetten beágyazott adatbázisjelszavakat tartalmaznak, de ideális esetben az Azure Databricks titkos kulcsait kell használnia.

Először létrehozzuk és csatoljuk a szükséges SQL-összekötőket és az Azure Cosmos DB-összekötők kódtárait az Azure Databricks-fürthöz. Indítsa újra a fürtöt, és győződjön meg arról, hogy a kódtárak be vannak töltve.

Screenshot that shows where to create and attach the required SQL connector and Azure Cosmos DB connector libraries to our Azure Databricks cluster.

Ezután két mintát mutatunk be a Scalához és a Pythonhoz.

Scala

Itt lekérjük az SQL-lekérdezés eredményeit a "FOR JSON" kimenettel egy DataFrame-be:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Screenshot that shows the SQL query output in a DataFrame.

Ezután csatlakozunk az Azure Cosmos DB-adatbázishoz és -gyűjteményhez:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

Végül meghatározzuk a sémát, és a from_json használatával alkalmazzuk a DataFrame-et, mielőtt a Cosmos DB-gyűjteménybe mentenénk.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Screenshot that highlights the proper array for saving to an Azure Cosmos DB collection.

Python

Alternatív megközelítésként előfordulhat, hogy JSON-átalakításokat kell végrehajtania a Sparkban, ha a forrásadatbázis nem támogatja FOR JSON vagy hasonló műveletet. Alternatív megoldásként használhat párhuzamos műveleteket egy nagy adatkészlethez. Itt bemutatunk egy PySpark-mintát. Először konfigurálja a forrás- és céladatbázis-kapcsolatokat az első cellában:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

Ezután lekérdezzük a forrásadatbázist (ebben az esetben az SQL Servert) a rendelési és a rendelési adatok rekordjaihoz, és az eredményeket Spark-adatkeretekbe helyezi. Létrehozunk egy listát is, amely tartalmazza az összes rendelésazonosítót, valamint egy szálkészletet a párhuzamos műveletekhez:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

Ezután hozzon létre egy függvényt a Rendelések a NoSQL-gyűjtemény cél API-ba való írásához. Ez a függvény szűri a megadott rendelésazonosító összes rendelési részletét, JSON-tömbté alakítja őket, és beszúrja a tömböt egy JSON-dokumentumba. A JSON-dokumentum ezután a NoSQL-tároló cél API-jába lesz beírva a következő sorrendben:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://learn.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

Végül meghívjuk a Python-függvényt writeOrder egy térképfüggvény használatával a szálkészleten, hogy párhuzamosan hajtsuk végre a korábban létrehozott sorrendazonosítók listáját:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

Mindkét megközelítésben a végén megfelelően kell mentenünk a beágyazott OrderDetail-fájlokat az Azure Cosmos DB-gyűjtemény minden rendelési dokumentumában:

Screenshot of the resulting data after migration.

További lépések