Перенос реляционных данных в учетную запись Azure Cosmos DB для NoSQL
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
Чтобы перенести реляционную базу данных в Azure Cosmos DB для NoSQL, можно внести изменения в модель данных для оптимизации.
Одним из распространенных преобразований является денормализация данных путем встраивания связанных вложенных элементов в один документ JSON. Здесь мы рассмотрим несколько вариантов решения этой задачи с помощью Фабрики данных Azure или Azure Databricks. Дополнительные сведения о моделировании данных для Azure Cosmos DB см. в статье об моделировании данных в Azure Cosmos DB.
Пример сценария
Предположим, в базе данных SQL имеются две таблицы: Orders и OrderDetails.
Во время миграции мы хотим объединить эти связи "один к нескольким" в один документ JSON. Чтобы создать один документ, создайте запрос T-SQL с помощью FOR JSON
:
SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
Результаты этого запроса будут содержать данные из таблицы Orders :
В идеальном случае для запроса данных SQL в качестве источника и записи выходные данные непосредственно в приемник Azure Cosmos DB в качестве правильных объектов JSON хотелось бы использовать одно действие копирования Фабрики данных Azure (ADF). В настоящее время невозможно выполнить необходимое преобразование JSON в одном действии копирования. Если мы попытаемся скопировать результаты приведенного выше запроса в контейнер Azure Cosmos DB для NoSQL, то поле OrderDetails отображается как строковое свойство нашего документа, а не ожидаемый массив JSON.
Обойти это текущее ограничение можно одним из описанных ниже способов.
- Используйте Фабрика данных Azure с двумя действиями копирования:
- Получение данных в формате JSON из SQL в текстовый файл в промежуточном расположении хранилища BLOB-объектов
- загрузка данных из текстового файла JSON в контейнер в Azure Cosmos DB.
- Используйте Azure Databricks для чтения из SQL и записи в Azure Cosmos DB . Здесь представлены два варианта.
которые подробнее рассмотрены ниже.
Azure Data Factory
Хотя мы не можем внедрить OrderDetails в виде массива JSON в целевом документе Azure Cosmos DB, мы можем обойти проблему с помощью двух отдельных действий копирования.
Действие копирования 1: SqlJsonToBlobText
Для исходных данных мы используем SQL-запрос, возвращающий результирующий набор в виде одного столбца с одним объектом JSON (представляющим заказ) для каждой строки с помощью функций FOR JSON PATH и OPENJSON SQL Server:
SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
Для приемника SqlJsonToBlobText
действия копирования мы выбираем "Разделителя текста" и указываем его на определенную папку в Хранилище BLOB-объектов Azure. Этот приемник включает динамически созданное уникальное имя файла (например, @concat(pipeline().RunId,'.json')
.
Так как текстовый файл не является "разделителем", и мы не хотим, чтобы он был проанализирован в отдельные столбцы с помощью запятых. Мы также хотим сохранить двойные кавычки (), задайте для табуляции ("\t") значение tab (\t) или другой символ, не происходящий в данных, а затем задайте для параметра "Символ кавычки" значение "Нет символа кавычки".
Действие копирования 2: BlobJsonToCosmos
Затем мы изменим конвейер ADF, добавив второе действие копирования, которое выглядит в Хранилище BLOB-объектов Azure текстового файла, созданного первым действием. Он обрабатывает его как источник JSON, чтобы вставить в приемник Azure Cosmos DB как один документ на строку JSON, найденную в текстовом файле.
При необходимости также можно добавить в конвейер действие "Удалить", чтобы оно перед каждым запуском удаляло из папки /Orders/ все файлы, оставшиеся там после предыдущих запусков. Наш конвейер ADF теперь выглядит примерно так:
После запуска конвейера, упоминаемого ранее, мы видим файл, созданный в промежуточном Хранилище BLOB-объектов Azure расположении, содержающем один объект JSON-объекта на строку:
Кроме того, мы видим документы Orders с правильно внедренными заказами, вставленными в коллекцию Azure Cosmos DB:
Azure Databricks
Кроме того, в Azure Databricks можно с помощью Spark скопировать данные из источника базы данных SQL в целевую базу Azure Cosmos DB, не создавая промежуточные текстовые или JSON-файлы в хранилище BLOB-объектов Azure.
Примечание.
Для ясности и простоты фрагменты кода включают фиктивные пароли базы данных явно встроенными, но в идеале следует использовать секреты Azure Databricks.
Сначала мы создадим и подключим необходимые библиотеки соединителей SQL и соединителей Azure Cosmos DB к кластеру Azure Databricks. Перезапустите кластер, чтобы убедиться, что библиотеки загружены.
Далее мы составим два примера для Scala и Python.
Scala
Здесь мы получаем результаты SQL-запроса с выходным предложением FOR JSON в кадре данных:
// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
Затем мы подключаемся к базе данных и коллекции Azure Cosmos DB:
// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)
Наконец, мы определим схему и используем from_json для применения кадра данных перед сохранением его в коллекции Cosmos DB.
// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Python
В качестве альтернативного подхода может потребоваться выполнить преобразования JSON в Spark, если исходная база данных не поддерживает FOR JSON
или аналогичную операцию. Кроме того, можно использовать параллельные операции для большого набора данных. Здесь мы представляем пример PySpark. Для начала настроим подключения к исходной и целевой базе данных в первой ячейке:
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
"Endpoint": "Endpoint",
"Masterkey": "Masterkey",
"Database": "OrdersDB",
"Collection": "Orders",
"Upsert": "true"
}
Затем мы запрашиваем исходную базу данных (в данном случае SQL Server) для записей сведений о заказе и заказе, помещая результаты в кадры данных Spark. Мы также создадим список, содержащий все идентификаторы заказов и пул потоков для параллельных операций:
import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
#get all OrderId values to pass to map function
orderids = orders.select('OrderId').collect()
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)
Затем создайте функцию для записи Orders в целевую коллекцию API noSQL. Эта функция фильтрует все сведения о порядке для заданного идентификатора заказа, преобразует их в массив JSON и вставляет массив в документ JSON. Затем документ JSON записывается в целевой контейнер API для NoSQL для этого заказа:
def writeOrder(orderid):
#filter the order on current value passed from map function
order = orders.filter(orders['OrderId'] == orderid[0])
#set id to be a uuid
order = order.withColumn("id", lit(str(uuid.uuid1())))
#add details field to order dataframe
order = order.withColumn("details", lit(''))
#filter order details dataframe to get details we want to merge into the order document
orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
#convert dataframe to pandas
orderpandas = order.toPandas()
#convert the order dataframe to json and remove enclosing brackets
orderjson = orderpandas.to_json(orient='records', force_ascii=False)
orderjson = orderjson[1:-1]
#convert orderjson to a dictionaory so we can set the details element with order details later
orderjsondata = json.loads(orderjson)
#convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
if (orderdetailsgroup.count() !=0):
#convert orderdetailsgroup to pandas dataframe to work better with json
orderdetailsgroup = orderdetailsgroup.toPandas()
#convert orderdetailsgroup to json string
jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
#convert jsonstring to dictionary to ensure correct encoding and no corrupt records
jsonstring = json.loads(jsonstring)
#set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order
orderjsondata['details'] = jsonstring
#convert dictionary to json
orderjsondata = json.dumps(orderjsondata)
#read the json into spark dataframe
df = spark.read.json(sc.parallelize([orderjsondata]))
#write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
#https://learn.microsoft.com/azure/cosmos-db/spark-connector
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()
Наконец, мы вызываем функцию Python writeOrder
с помощью функции карты в пуле потоков для параллельного выполнения, передавая список идентификаторов порядка, созданных ранее:
#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)
В любом случае мы должны правильно сохранить внедренные orderDetails в каждом документе Order в коллекции Azure Cosmos DB:
Следующие шаги
- Подробнее о моделировании данных в Azure Cosmos DB
- Подробнее о моделировании и секционировании данных в Azure Cosmos DB