Migrar dados relacionais de um para poucos para uma conta do Azure Cosmos DB para NoSQL

APLICA-SE A: NoSQL

Para migrar de um banco de dados relacional para o Azure Cosmos DB para NoSQL, pode ser necessário fazer alterações no modelo de dados para otimização.

Uma transformação comum é a desnormalização de dados incorporando subitens relacionados em um documento JSON. Aqui examinamos algumas opções para isso usando o Azure Data Factory ou o Azure Databricks. Para obter mais informações sobre modelagem de dados para o Azure Cosmos DB, consulte modelagem de dados no Azure Cosmos DB.

Cenário de Exemplo

Suponha que temos as duas tabelas a seguir em nosso banco de dados SQL, Orders e OrderDetails.

Screenshot that shows the Orders and OrderDetails tables in the SQL database.

Queremos combinar essa relação um-para-poucos em um documento JSON durante a migração. Para criar um único documento, crie uma consulta T-SQL usando FOR JSON:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

Os resultados dessa consulta incluiriam dados da tabela Pedidos :

Screenshot of a query that results in details of various orders.

Idealmente, você deseja usar uma única atividade de cópia do Azure Data Factory (ADF) para consultar dados SQL como a origem e gravar a saída diretamente no coletor do Azure Cosmos DB como objetos JSON adequados. Atualmente, não é possível executar a transformação JSON necessária em uma atividade de cópia. Se tentarmos copiar os resultados da consulta acima em um contêiner do Azure Cosmos DB para NoSQL, veremos o campo OrderDetails como uma propriedade de cadeia de caracteres do nosso documento, em vez da matriz JSON esperada.

Podemos contornar essa limitação atual de uma das seguintes maneiras:

  • Use o Azure Data Factory com duas atividades de cópia:
    1. Obter dados formatados em JSON do SQL para um arquivo de texto em um local de armazenamento de blob intermediário
    2. Carregue dados do arquivo de texto JSON para um contêiner no Azure Cosmos DB.
  • Use o Azure Databricks para ler do SQL e gravar no Azure Cosmos DB - apresentamos duas opções aqui.

Vejamos essas abordagens com mais detalhes:

Azure Data Factory

Embora não possamos incorporar OrderDetails como uma matriz JSON no documento de destino do Azure Cosmos DB, podemos contornar o problema usando duas Atividades de Cópia separadas.

Atividade de cópia #1: SqlJsonToBlobText

Para os dados de origem, usamos uma consulta SQL para obter o conjunto de resultados como uma única coluna com um objeto JSON (representando a Ordem) por linha usando os recursos SQL Server OPENJSON e PARA JSON PATH:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

Screenshot of the preview values in the ADF copy operation.

Para o SqlJsonToBlobText coletor da atividade de cópia, escolhemos "Texto Delimitado" e apontamos para uma pasta específica no Armazenamento de Blobs do Azure. Esse coletor inclui um nome de arquivo exclusivo gerado dinamicamente (por exemplo, @concat(pipeline().RunId,'.json'). Como nosso arquivo de texto não é realmente "delimitado" e não queremos que ele seja analisado em colunas separadas usando vírgulas. Também queremos preservar as aspas duplas ("), definir "Delimitador de coluna" para uma guia ("\t") - ou outro caractere que não ocorre nos dados e, em seguida, definir "Caractere de aspas" como "Sem caractere de aspas".

Screenshot that highlights the Column delimiter and Quote character settings.

Atividade de cópia #2: BlobJsonToCosmos

Em seguida, modificamos nosso pipeline do ADF adicionando a segunda Atividade de Cópia que procura no Armazenamento de Blobs do Azure o arquivo de texto criado pela primeira atividade. Ele o processa como fonte "JSON" para inserir no coletor do Azure Cosmos DB como um documento por linha JSON encontrado no arquivo de texto.

Screenshot that highlights the JSON source file and the File path fields.

Opcionalmente, também adicionamos uma atividade "Excluir" ao pipeline para que ele exclua todos os arquivos anteriores restantes na pasta /Orders/ antes de cada execução. Nosso pipeline do ADF agora se parece com isto:

Screenshot that highlights the Delete activity.

Depois de acionarmos o pipeline mencionado anteriormente, vemos um arquivo criado em nosso local intermediário do Armazenamento de Blobs do Azure contendo um objeto JSON por linha:

Screenshot that shows the created file that contains the JSON objects.

Também vemos documentos de Pedidos com OrderDetails devidamente incorporados inseridos em nossa coleção do Azure Cosmos DB:

Screenshot that shows the order details as a part of the Azure Cosmos DB document

Azure Databricks

Também podemos usar o Spark no Azure Databricks para copiar os dados de nossa fonte do Banco de Dados SQL para o destino do Azure Cosmos DB sem criar os arquivos de texto/JSON intermediários no Armazenamento de Blobs do Azure.

Nota

Para maior clareza e simplicidade, os trechos de código incluem senhas de banco de dados fictícias explicitamente embutidas, mas você deve usar idealmente os segredos do Azure Databricks.

Primeiro, criamos e anexamos o conector SQL necessário e as bibliotecas de conectores do Azure Cosmos DB ao nosso cluster do Azure Databricks. Reinicie o cluster para garantir que as bibliotecas sejam carregadas.

Screenshot that shows where to create and attach the required SQL connector and Azure Cosmos DB connector libraries to our Azure Databricks cluster.

Em seguida, apresentamos dois exemplos, para Scala e Python.

Scala

Aqui, obtemos os resultados da consulta SQL com saída "FOR JSON" em um DataFrame:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Screenshot that shows the SQL query output in a DataFrame.

Em seguida, conectamo-nos à nossa coleção e banco de dados do Azure Cosmos DB:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

Finalmente, definimos nosso esquema e usamos from_json para aplicar o DataFrame antes de salvá-lo na coleção do Cosmos DB.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Screenshot that highlights the proper array for saving to an Azure Cosmos DB collection.

Python

Como uma abordagem alternativa, talvez seja necessário executar transformações JSON no Spark se o banco de dados de origem não oferecer suporte FOR JSON ou uma operação semelhante. Como alternativa, você pode usar operações paralelas para um grande conjunto de dados. Aqui apresentamos uma amostra do PySpark. Comece configurando as conexões do banco de dados de origem e de destino na primeira célula:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

Em seguida, consultamos o banco de dados de origem (neste caso, o SQL Server) para obter os registros de detalhes do pedido e do pedido, colocando os resultados no Spark Dataframes. Também criamos uma lista contendo todos os IDs de ordem e um pool de threads para operações paralelas:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

Em seguida, crie uma função para escrever ordens na API de destino para a coleção NoSQL. Essa função filtra todos os detalhes do pedido para o ID do pedido fornecido, converte-os em uma matriz JSON e insere a matriz em um documento JSON. O documento JSON é então gravado na API de destino para contêiner NoSQL para essa ordem:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://learn.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

Finalmente, chamamos a função Python writeOrder usando uma função de mapa no pool de threads, para executar em paralelo, passando a lista de IDs de ordem que criamos anteriormente:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

Em qualquer uma das abordagens, no final, devemos salvar corretamente OrderDetails incorporados em cada documento Order na coleção do Azure Cosmos DB:

Screenshot of the resulting data after migration.

Próximos passos