Memigrasikan data relasional satu ke beberapa ke akun Azure Cosmos DB for NoSQL
BERLAKU UNTUK: NoSQL
Untuk bermigrasi dari database relasional ke Azure Cosmos DB untuk NoSQL, perlu untuk membuat perubahan pada model data untuk pengoptimalan.
Salah satu transformasi umum adalah denormalisasi data dengan menyematkan subitem terkait dalam satu dokumen JSON. Di sini kita melihat beberapa opsi untuk ini menggunakan Azure Data Factory atau Azure Databricks. Untuk informasi selengkapnya tentang pemodelan data untuk Azure Cosmos DB, lihat pemodelan data di Azure Cosmos DB.
Contoh Skenario
Asumsikan kita memiliki dua tabel berikut dalam basis data SQL, Pesanan, dan OrderDetails kami.
Kami ingin menggabungkan satu hingga beberapa hubungan ini ke dalam satu dokumen JSON selama migrasi. Untuk membuat satu dokumen, buat kueri T-SQL menggunakan FOR JSON
:
SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
Hasil kueri ini akan menyertakan data dari tabel Pesanan :
Idealnya, Anda ingin menggunakan satu aktivitas salinan Azure Data Factory (ADF) untuk mengkueri data SQL sebagai sumber dan menulis output langsung ke Azure Cosmos DB tenggelam sebagai objek JSON yang tepat. Saat ini, tidak dimungkinkan untuk melakukan transformasi JSON yang diperlukan dalam satu aktivitas salin. Jika kita mencoba menyalin hasil kueri di atas ke dalam kontainer Azure Cosmos DB for NoSQL, kita melihat bidang OrderDetails sebagai properti string dokumen kita, bukan array JSON yang diharapkan.
Kita dapat mengatasi batasan saat ini dengan salah satu cara berikut:
- Gunakan Azure Data Factory dengan dua aktivitas salin:
- Mendapatkan data berformat JSON dari SQL ke file teks di lokasi penyimpanan blob perantara
- Muat data dari file teks JSON ke kontainer di Azure Cosmos DB.
- Gunakan Azure Databricks untuk membaca dari SQL dan menulis ke Azure Cosmos DB - kami menyajikan dua opsi di sini.
Mari kita lihat pendekatan ini secara lebih rinci:
Azure Data Factory
Meskipun kami tidak dapat menyematkan OrderDetails sebagai array JSON di dokumen Azure Cosmos DB tujuan, kita dapat mengatasi masalah ini dengan menggunakan dua Aktivitas Salin terpisah.
Salin Aktivitas #1: SqlJsonToBlobText
Untuk data sumber, kami menggunakan kueri SQL untuk mendapatkan kumpulan hasil sebagai kolom tunggal dengan satu objek JSON (mewakili Pesanan) per baris menggunakan kemampuan SQL Server OPENJSON dan FOR JSON PATH:
SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
Untuk sink SqlJsonToBlobText
aktivitas salin, kami memilih "Teks Berbatas" dan mengarahkannya ke folder tertentu di Azure Blob Storage. Sink ini mencakup nama file unik yang dihasilkan secara dinamis (misalnya, @concat(pipeline().RunId,'.json')
.
Karena file teks kami tidak benar-benar "dibatasi" dan kami tidak ingin file teks diuraikan ke dalam kolom terpisah menggunakan koma. Kami juga ingin mempertahankan tanda kutip ganda ("), atur "Pemisah kolom" ke Tab ("\t") - atau karakter lain yang tidak terjadi dalam data, lalu atur "Karakter kutipan" ke "Tidak ada karakter kutipan".
Salin Aktivitas #2: BlobJsonToCosmos
Selanjutnya, kami memodifikasi alur ADF kami dengan menambahkan Aktivitas Salin kedua yang terlihat di Azure Blob Storage untuk file teks yang dibuat oleh aktivitas pertama. Ini memprosesnya sebagai sumber "JSON" untuk disisipkan ke sink Azure Cosmos DB sebagai satu dokumen per baris JSON yang ditemukan dalam file teks.
Secara opsional, kami juga menambahkan aktivitas "Hapus" ke pipeline sehingga menghapus semua file sebelumnya yang tersisa di folder /Orders/ sebelum setiap run. Saluran pipa ADF kami sekarang terlihat seperti ini:
Setelah memicu alur yang disebutkan sebelumnya, kami melihat file yang dibuat di lokasi Azure Blob Storage perantara kami yang berisi satu objek JSON per baris:
Kami juga melihat Dokumen pesanan dengan OrderDetails yang disematkan dengan benar yang dimasukkan ke dalam koleksi Azure Cosmos DB kami:
Azure Databricks
Kami juga dapat menggunakan Spark di Azure Databricks untuk menyalin data dari sumber Database SQL kami ke tujuan Azure Cosmos DB tanpa membuat teks perantara / file JSON di Penyimpanan Azure Blob.
Catatan
Untuk kejelasan dan kesederhanaan, cuplikan kode menyertakan kata sandi database dummy secara eksplisit sebaris, tetapi Idealnya Anda harus menggunakan rahasia Azure Databricks.
Pertama, kami membuat dan melampirkan konektor SQL yang diperlukan dan pustaka konektor Azure Cosmos DB ke klaster Azure Databricks kami. Mulai ulang kluster untuk memastikan pustaka dimuat.
Selanjutnya, kami menyajikan dua sampel, untuk Scala dan Python.
Scala
Di sini, kita mendapatkan hasil kueri SQL dengan output "FOR JSON" ke dalam DataFrame:
// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
Selanjutnya, kami terhubung ke database dan koleksi Azure Cosmos DB kami:
// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)
Terakhir, kami menentukan skema kami dan menggunakan from_json untuk menerapkan DataFrame sebelum menyimpannya ke koleksi Cosmos DB.
// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Python
Sebagai pendekatan alternatif, Anda mungkin perlu menjalankan transformasi JSON di Spark jika database sumber tidak mendukung FOR JSON
atau operasi serupa. Atau, Anda dapat menggunakan operasi paralel untuk himpunan data besar. Di sini kami menyajikan sampel PySpark. Mulai dengan mengonfigurasi koneksi basis data sumber dan target di sel pertama:
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
"Endpoint": "Endpoint",
"Masterkey": "Masterkey",
"Database": "OrdersDB",
"Collection": "Orders",
"Upsert": "true"
}
Kemudian, kami mengkueri Database sumber (dalam hal ini SQL Server) untuk catatan detail pesanan dan pesanan, memasukkan hasilnya ke dalam Spark Dataframes. Kami juga membuat daftar yang berisi semua ID pesanan, dan kumpulan Utas untuk operasi paralel:
import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
#get all OrderId values to pass to map function
orderids = orders.select('OrderId').collect()
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)
Kemudian, buat fungsi untuk menulis Pesanan ke api target untuk koleksi NoSQL. Fungsi ini memfilter semua detail pesanan untuk ID pesanan yang diberikan, mengonversinya menjadi array JSON, dan menyisipkan array ke dalam dokumen JSON. Dokumen JSON kemudian ditulis ke dalam API target untuk kontainer NoSQL untuk urutan tersebut:
def writeOrder(orderid):
#filter the order on current value passed from map function
order = orders.filter(orders['OrderId'] == orderid[0])
#set id to be a uuid
order = order.withColumn("id", lit(str(uuid.uuid1())))
#add details field to order dataframe
order = order.withColumn("details", lit(''))
#filter order details dataframe to get details we want to merge into the order document
orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
#convert dataframe to pandas
orderpandas = order.toPandas()
#convert the order dataframe to json and remove enclosing brackets
orderjson = orderpandas.to_json(orient='records', force_ascii=False)
orderjson = orderjson[1:-1]
#convert orderjson to a dictionaory so we can set the details element with order details later
orderjsondata = json.loads(orderjson)
#convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
if (orderdetailsgroup.count() !=0):
#convert orderdetailsgroup to pandas dataframe to work better with json
orderdetailsgroup = orderdetailsgroup.toPandas()
#convert orderdetailsgroup to json string
jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
#convert jsonstring to dictionary to ensure correct encoding and no corrupt records
jsonstring = json.loads(jsonstring)
#set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order
orderjsondata['details'] = jsonstring
#convert dictionary to json
orderjsondata = json.dumps(orderjsondata)
#read the json into spark dataframe
df = spark.read.json(sc.parallelize([orderjsondata]))
#write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
#https://learn.microsoft.com/azure/cosmos-db/spark-connector
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()
Terakhir, kita memanggil fungsi Python writeOrder
menggunakan fungsi peta pada kumpulan utas, untuk mengeksekusi secara paralel, meneruskan daftar ID pesanan yang kita buat sebelumnya:
#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)
Dalam kedua pendekatan, pada akhirnya, kita harus menyimpan OrderDetails yang disematkan dengan benar dalam setiap dokumen Pesanan dalam koleksi Azure Cosmos DB:
Langkah berikutnya
- Pelajari tentang pemodelan data dalam Azure Cosmos DB
- Pelajari cara memodelkan dan mempartisi data di Azure Cosmos DB