Migrar datos relacionales de uno a varios a una cuenta de Azure Cosmos DB for NoSQL
SE APLICA A: NoSQL
Para migrar de una base de datos relacional a Azure Cosmos DB for NoSQL, puede que necesite modificar el modelo de datos para optimizarlo.
Una transformación común es la desnormalización de los datos al insertar subelementos relacionados en un solo documento JSON. A continuación se describen algunas opciones para realizar el proceso con Azure Data Factory o Azure Databricks. Para obtener más información sobre el modelado de datos para Azure Cosmos DB, consulte Modelado de datos en Azure Cosmos DB.
Escenario de ejemplo
Supongamos que existen las dos tablas siguientes en una base de datos SQL: Orders y OrderDetails.
Se quiere combinar esta relación de uno a varios en un documento JSON durante la migración. Para crear un único documento, cree una consulta T-SQL mediante FOR JSON
:
SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
Los resultados de esta consulta incluirán datos de la tabla Pedidos:
Lo ideal sería usar una única actividad de copia de Azure Data Factory (ADF) para consultar datos SQL como origen y escribir la salida directamente en el receptor de Azure Cosmos DB como objetos JSON adecuados. Actualmente, no es posible realizar la transformación de JSON necesaria en una actividad de copia. Si se intenta copiar los resultados de la consulta anterior en un contenedor de Azure Cosmos DB for NoSQL, el campo OrderDetails se muestra como una propiedad de cadena de nuestro documento, en lugar de ser la matriz JSON esperada.
Esta limitación actual se puede solucionar de una de las siguientes maneras:
- Usar Azure Data Factory con dos actividades de copia:
- Almacenar los datos con formato JSON de SQL en un archivo de texto en una ubicación de almacenamiento de blobs intermedia
- Cargue los datos del archivo de texto JSON en un contenedor en Azure Cosmos DB.
- Usar Azure Databricks para leer de SQL y escribir en Azure Cosmos DB: en este caso, se muestran dos opciones.
Echemos un vistazo a estos métodos con más detalle:
Azure Data Factory
Aunque no se puede insertar OrderDetails como una matriz JSON en el documento de Azure Cosmos DB de destino, se puede solucionar el problema al usar dos actividades de copia independientes.
Actividad de copia n.º 1: SqlJsonToBlobText
En el caso de los datos de origen, se usa una consulta SQL para obtener el conjunto de resultados como una sola columna con un objeto JSON por fila (que representa al pedido) mediante las funciones OPENJSON y FOR JSON PATH de SQL Server:
SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
En el caso del receptor de la actividad de copia SqlJsonToBlobText
, elegimos "Texto delimitado" y apuntamos a una carpeta concreta de Azure Blob Storage. Este receptor incluye un nombre de archivo único generado de forma dinámica (por ejemplo, @concat(pipeline().RunId,'.json')
).
Dado que nuestro archivo de texto no está realmente "delimitado" y no queremos que se analice en columnas independientes mediante comas. También queremos mantener las comillas dobles ("), establecer "Delimitador de columna" en un tabulador ("\t"), u otro carácter que no aparezca en los datos, y, luego establecer "Carácter de comillas" en "Sin carácter de comillas".
Actividad de copia n.º 2: BlobJsonToCosmos
A continuación, se modifica la canalización de ADF al agregar la segunda actividad de copia que busca en Azure Blob Storage al archivo de texto creada en la primera actividad. Este se procesa como un origen "JSON" a finde insertarse en el receptor de Azure Cosmos DB como un documento por cada fila JSON encontrada en el archivo de texto.
También se puede agregar una actividad "Eliminar" a la canalización para que elimine todos los archivos anteriores que queden en la carpeta /Orders/ antes de cada ejecución. Nuestra canalización de ADF ahora tiene un aspecto similar al siguiente:
Después de desencadenar la canalización mencionada anteriormente, se crea un archivo en la ubicación intermedia de Azure Blob Storage que contiene un objeto JSON por cada fila:
También se muestran los documentos de Orders con OrderDetails insertados correctamente en nuestra colección de Azure Cosmos DB:
Azure Databricks
También se puede usar Spark en Azure Databricks para copiar los datos del origen de SQL Database al destino de Azure Cosmos DB sin crear los archivos de texto o JSON intermedios en Azure Blob Storage.
Nota:
Para mayor claridad y simplicidad, los fragmentos de código incluyen contraseñas de base de datos ficticias insertadas explícitamente. Sin embargo, debe usar secretos de Azure Databricks.
En primer lugar, se crean y se conectan las bibliotecas del conector de SQL y del conector de Azure Cosmos DB necesarias en nuestro clúster de Azure Databricks. Reinicie el clúster para asegurarse de las bibliotecas se carguen.
A continuación, se incluyen dos ejemplos; uno para Scala y otro para Python.
Scala
En este caso, se obtienen los resultados de la consulta SQL con la salida "FOR JSON" en un objeto DataFrame:
// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
A continuación, se establece una conexión con la base de datos y la colección de Azure Cosmos DB:
// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)
Por último, se define el esquema y se usa from_json para aplicar el objeto DataFrame antes de guardarlo en la colección de Cosmos DB.
// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Python
Como enfoque alternativo, es posible que tenga que ejecutar transformaciones JSON en Spark si la base de datos de origen no admite FOR JSON
o una operación similar. Como alternativa, puede usar operaciones paralelas para un conjunto de datos grande. El siguiente es un ejemplo de PySpark. Empiece por configurar las conexiones de base de datos de origen y de destino en la primera celda:
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
"Endpoint": "Endpoint",
"Masterkey": "Masterkey",
"Database": "OrdersDB",
"Collection": "Orders",
"Upsert": "true"
}
A continuación, se consulta la base de datos de origen (en este caso, SQL Server) para recuperar los registros de Order y OrderDetails y los resultados se colocarán en objetos DataFrame de Spark. También se crea una lista que contenga todos los Id. de pedido y un grupo de subprocesos para las operaciones en paralelo:
import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
#get all OrderId values to pass to map function
orderids = orders.select('OrderId').collect()
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)
A continuación, cree una función para escribir los elementos de Order en la colección de la API para NoSQL de destino. Esta función filtra todos los detalles del pedido del identificador de pedido especificado, los convierte en una matriz JSON e inserta la matriz en un documento JSON. A continuación, el documento JSON se escribe en la API de destino para el contenedor NoSQL de ese pedido:
def writeOrder(orderid):
#filter the order on current value passed from map function
order = orders.filter(orders['OrderId'] == orderid[0])
#set id to be a uuid
order = order.withColumn("id", lit(str(uuid.uuid1())))
#add details field to order dataframe
order = order.withColumn("details", lit(''))
#filter order details dataframe to get details we want to merge into the order document
orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
#convert dataframe to pandas
orderpandas = order.toPandas()
#convert the order dataframe to json and remove enclosing brackets
orderjson = orderpandas.to_json(orient='records', force_ascii=False)
orderjson = orderjson[1:-1]
#convert orderjson to a dictionaory so we can set the details element with order details later
orderjsondata = json.loads(orderjson)
#convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
if (orderdetailsgroup.count() !=0):
#convert orderdetailsgroup to pandas dataframe to work better with json
orderdetailsgroup = orderdetailsgroup.toPandas()
#convert orderdetailsgroup to json string
jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
#convert jsonstring to dictionary to ensure correct encoding and no corrupt records
jsonstring = json.loads(jsonstring)
#set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order
orderjsondata['details'] = jsonstring
#convert dictionary to json
orderjsondata = json.dumps(orderjsondata)
#read the json into spark dataframe
df = spark.read.json(sc.parallelize([orderjsondata]))
#write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
#https://learn.microsoft.com/azure/cosmos-db/spark-connector
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()
Por último, se llama a la función Python writeOrder
mediante una función de asignación en el grupo de subprocesos para que se ejecute en paralelo, pasando la lista de Id. de pedido que se ha creado anteriormente:
#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)
En cualquiera de estos métodos, al final, debe haber objetos OrderDetails insertados y guardados correctamente en cada documento Order de la colección de Azure Cosmos DB:
Pasos siguientes
- Más información sobre el modelado de datos en Azure Cosmos DB
- Más información sobre cómo modelar y crear particiones de datos en Azure Cosmos DB