Bagikan melalui


Memigrasikan data relasional satu ke beberapa ke akun Azure Cosmos DB for NoSQL

BERLAKU UNTUK: NoSQL

Untuk bermigrasi dari database relasional ke Azure Cosmos DB untuk NoSQL, perlu untuk membuat perubahan pada model data untuk pengoptimalan.

Salah satu transformasi umum adalah denormalisasi data dengan menyematkan subitem terkait dalam satu dokumen JSON. Di sini kita melihat beberapa opsi untuk ini menggunakan Azure Data Factory atau Azure Databricks. Untuk informasi selengkapnya tentang pemodelan data untuk Azure Cosmos DB, lihat pemodelan data di Azure Cosmos DB.

Contoh Skenario

Asumsikan kita memiliki dua tabel berikut dalam basis data SQL, Pesanan, dan OrderDetails kami.

Cuplikan layar yang memperlihatkan tabel Pesanan dan OrderDetails di database SQL..

Kami ingin menggabungkan satu hingga beberapa hubungan ini ke dalam satu dokumen JSON selama migrasi. Untuk membuat satu dokumen, buat kueri T-SQL menggunakan FOR JSON:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

Hasil kueri ini akan menyertakan data dari tabel Pesanan :

Cuplikan layar kueri yang menghasilkan detail berbagai pesanan.

Idealnya, Anda ingin menggunakan satu aktivitas salinan Azure Data Factory (ADF) untuk mengkueri data SQL sebagai sumber dan menulis output langsung ke Azure Cosmos DB tenggelam sebagai objek JSON yang tepat. Saat ini, tidak dimungkinkan untuk melakukan transformasi JSON yang diperlukan dalam satu aktivitas salin. Jika kita mencoba menyalin hasil kueri di atas ke dalam kontainer Azure Cosmos DB for NoSQL, kita melihat bidang OrderDetails sebagai properti string dokumen kita, bukan array JSON yang diharapkan.

Kita dapat mengatasi batasan saat ini dengan salah satu cara berikut:

  • Gunakan Azure Data Factory dengan dua aktivitas salin:
    1. Mendapatkan data berformat JSON dari SQL ke file teks di lokasi penyimpanan blob perantara
    2. Muat data dari file teks JSON ke kontainer di Azure Cosmos DB.
  • Gunakan Azure Databricks untuk membaca dari SQL dan menulis ke Azure Cosmos DB - kami menyajikan dua opsi di sini.

Mari kita lihat pendekatan ini secara lebih rinci:

Azure Data Factory

Meskipun kami tidak dapat menyematkan OrderDetails sebagai array JSON di dokumen Azure Cosmos DB tujuan, kita dapat mengatasi masalah ini dengan menggunakan dua Aktivitas Salin terpisah.

Salin Aktivitas #1: SqlJsonToBlobText

Untuk data sumber, kami menggunakan kueri SQL untuk mendapatkan kumpulan hasil sebagai kolom tunggal dengan satu objek JSON (mewakili Pesanan) per baris menggunakan kemampuan SQL Server OPENJSON dan FOR JSON PATH:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

Cuplikan layar nilai pratinjau dalam operasi penyalinan ADF.

Untuk sink SqlJsonToBlobText aktivitas salin, kami memilih "Teks Berbatas" dan mengarahkannya ke folder tertentu di Azure Blob Storage. Sink ini mencakup nama file unik yang dihasilkan secara dinamis (misalnya, @concat(pipeline().RunId,'.json'). Karena file teks kami tidak benar-benar "dibatasi" dan kami tidak ingin file teks diuraikan ke dalam kolom terpisah menggunakan koma. Kami juga ingin mempertahankan tanda kutip ganda ("), atur "Pemisah kolom" ke Tab ("\t") - atau karakter lain yang tidak terjadi dalam data, lalu atur "Karakter kutipan" ke "Tidak ada karakter kutipan".

Cuplikan layar yang menyoroti pembatas kolom dan pengaturan karakter Quote.

Salin Aktivitas #2: BlobJsonToCosmos

Selanjutnya, kami memodifikasi alur ADF kami dengan menambahkan Aktivitas Salin kedua yang terlihat di Azure Blob Storage untuk file teks yang dibuat oleh aktivitas pertama. Ini memprosesnya sebagai sumber "JSON" untuk disisipkan ke sink Azure Cosmos DB sebagai satu dokumen per baris JSON yang ditemukan dalam file teks.

Cuplikan layar yang menyoroti file sumber JSON dan bidang Jalur file..

Secara opsional, kami juga menambahkan aktivitas "Hapus" ke pipeline sehingga menghapus semua file sebelumnya yang tersisa di folder /Orders/ sebelum setiap run. Saluran pipa ADF kami sekarang terlihat seperti ini:

Cuplikan layar yang menyoroti aktivitas Hapus.

Setelah memicu alur yang disebutkan sebelumnya, kami melihat file yang dibuat di lokasi Azure Blob Storage perantara kami yang berisi satu objek JSON per baris:

Cuplikan layar yang memperlihatkan file yang dibuat yang berisi objek JSON.

Kami juga melihat Dokumen pesanan dengan OrderDetails yang disematkan dengan benar yang dimasukkan ke dalam koleksi Azure Cosmos DB kami:

Cuplikan layar yang memperlihatkan detail pesanan sebagai bagian dari dokumen Azure Cosmos DB

Azure Databricks

Kami juga dapat menggunakan Spark di Azure Databricks untuk menyalin data dari sumber Database SQL kami ke tujuan Azure Cosmos DB tanpa membuat teks perantara / file JSON di Penyimpanan Azure Blob.

Catatan

Untuk kejelasan dan kesederhanaan, cuplikan kode menyertakan kata sandi database dummy secara eksplisit sebaris, tetapi Idealnya Anda harus menggunakan rahasia Azure Databricks.

Pertama, kami membuat dan melampirkan konektor SQL yang diperlukan dan pustaka konektor Azure Cosmos DB ke klaster Azure Databricks kami. Mulai ulang kluster untuk memastikan pustaka dimuat.

Tangkapan layar yang menunjukkan tempat membuat dan memasang konektor SQL yang diperlukan dan pustaka konektor Azure Cosmos DB ke klaster Azure Databricks kami.

Selanjutnya, kami menyajikan dua sampel, untuk Scala dan Python.

Scala

Di sini, kita mendapatkan hasil kueri SQL dengan output "FOR JSON" ke dalam DataFrame:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Cuplikan layar yang memperlihatkan output kueri SQL dalam DataFrame.

Selanjutnya, kami terhubung ke database dan koleksi Azure Cosmos DB kami:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

Terakhir, kami menentukan skema kami dan menggunakan from_json untuk menerapkan DataFrame sebelum menyimpannya ke koleksi Cosmos DB.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Cuplikan layar yang menyoroti array yang tepat untuk disimpan ke koleksi Azure Cosmos DB.

Python

Sebagai pendekatan alternatif, Anda mungkin perlu menjalankan transformasi JSON di Spark jika database sumber tidak mendukung FOR JSON atau operasi serupa. Atau, Anda dapat menggunakan operasi paralel untuk himpunan data besar. Di sini kami menyajikan sampel PySpark. Mulai dengan mengonfigurasi koneksi basis data sumber dan target di sel pertama:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

Kemudian, kami mengkueri Database sumber (dalam hal ini SQL Server) untuk catatan detail pesanan dan pesanan, memasukkan hasilnya ke dalam Spark Dataframes. Kami juga membuat daftar yang berisi semua ID pesanan, dan kumpulan Utas untuk operasi paralel:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

Kemudian, buat fungsi untuk menulis Pesanan ke api target untuk koleksi NoSQL. Fungsi ini memfilter semua detail pesanan untuk ID pesanan yang diberikan, mengonversinya menjadi array JSON, dan menyisipkan array ke dalam dokumen JSON. Dokumen JSON kemudian ditulis ke dalam API target untuk kontainer NoSQL untuk urutan tersebut:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://learn.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

Terakhir, kita memanggil fungsi Python writeOrder menggunakan fungsi peta pada kumpulan utas, untuk mengeksekusi secara paralel, meneruskan daftar ID pesanan yang kita buat sebelumnya:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

Dalam kedua pendekatan, pada akhirnya, kita harus menyimpan OrderDetails yang disematkan dengan benar dalam setiap dokumen Pesanan dalam koleksi Azure Cosmos DB:

Cuplikan layar data yang dihasilkan setelah migrasi.

Langkah berikutnya