Migrace relačních dat 1:1 do účtu Azure Cosmos DB for NoSQL
PLATÍ PRO: NoSQL
Aby bylo možné migrovat z relační databáze do Azure Cosmos DB for NoSQL, může být nutné provést změny datového modelu pro optimalizaci.
Jednou z běžných transformací je denormalizace dat vložením souvisejících dílčích položek do jednoho dokumentu JSON. Tady se podíváme na několik možností použití Azure Data Factory nebo Azure Databricks. Další informace o modelování dat pro službu Azure Cosmos DB najdete v tématu Modelování dat ve službě Azure Cosmos DB.
Ukázkový scénář
Předpokládejme, že máme v databázi SQL následující dvě tabulky, Orders a OrderDetails.
Během migrace chceme tento vztah 1:0 zkombinovat do jednoho dokumentu JSON. Pokud chcete vytvořit jeden dokument, vytvořte dotaz T-SQL pomocí FOR JSON
:
SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
Výsledky tohoto dotazu by zahrnovaly data z tabulky Orders :
V ideálním případě chcete použít jednu aktivitu kopírování azure Data Factory (ADF) k dotazování dat SQL jako zdroje a zápis výstupu přímo do jímky Azure Cosmos DB jako správných objektů JSON. V současné době není možné provést potřebnou transformaci JSON v jedné aktivitě kopírování. Pokud se pokusíme zkopírovat výsledky výše uvedeného dotazu do kontejneru Azure Cosmos DB for NoSQL, zobrazí se pole OrderDetails jako řetězcová vlastnost našeho dokumentu místo očekávaného pole JSON.
Toto aktuální omezení můžeme obejít jedním z následujících způsobů:
- Použití služby Azure Data Factory se dvěma aktivitami kopírování:
- Získání dat ve formátu JSON z SQL do textového souboru v umístění zprostředkujícího úložiště objektů blob
- Načtěte data z textového souboru JSON do kontejneru ve službě Azure Cosmos DB.
- Azure Databricks slouží ke čtení z SQL a zápisu do služby Azure Cosmos DB – tady jsou dvě možnosti.
Pojďme se podrobněji podívat na tyto přístupy:
Azure Data Factory
I když do cílového dokumentu Azure Cosmos DB nemůžeme vložit OrderDetails jako pole JSON, můžeme problém obejít pomocí dvou samostatných aktivit kopírování.
Aktivita kopírování č. 1: SqlJsonToBlobText
Pro zdrojová data pomocí dotazu SQL získáme sadu výsledků jako jeden sloupec s jedním objektem JSON (představujícím objednávku) na řádek pomocí funkcí SQL Server OPENJSON a FOR JSON PATH:
SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
U jímky SqlJsonToBlobText
aktivity kopírování zvolíme text s oddělovači a nasměrujeme ho na konkrétní složku ve službě Azure Blob Storage. Tato jímka obsahuje dynamicky vygenerovaný jedinečný název souboru (například @concat(pipeline().RunId,'.json')
.
Vzhledem k tomu, že textový soubor není ve skutečnosti "oddělený" a nechceme, aby se parsoval do samostatných sloupců pomocí čárek. Chceme také zachovat dvojité uvozovky ("), nastavit "Oddělovač sloupců" na tabulátor ("\t") nebo jiný znak, ke kterému v datech nedochází, a pak nastavit znak uvozovky na "Žádný znak uvozovky".
Aktivita kopírování č. 2: BlobJsonToCosmos
Dále upravíme kanál ADF přidáním druhé aktivity kopírování, která hledá ve službě Azure Blob Storage textový soubor vytvořený první aktivitou. Zpracovává ho jako zdroj JSON pro vložení do jímky Azure Cosmos DB jako jeden dokument na řádek JSON nalezený v textovém souboru.
Volitelně také do kanálu přidáme aktivitu "Odstranit", aby před každým spuštěním odstranila všechny předchozí soubory zbývající ve složce /Orders/. Náš kanál ADF teď vypadá nějak takto:
Po aktivaci dříve zmíněného kanálu uvidíme soubor vytvořený v našem zprostředkujícím umístění služby Azure Blob Storage obsahující jeden objekt JSON na řádek:
Uvidíme také dokumenty Orders s správně vloženými orderDetails vloženými do naší kolekce Azure Cosmos DB:
Azure Databricks
Spark v Azure Databricks můžeme také použít ke zkopírování dat ze zdroje služby SQL Database do cíle služby Azure Cosmos DB bez vytvoření zprostředkujících textových souborů a souborů JSON ve službě Azure Blob Storage.
Poznámka:
Z důvodu srozumitelnosti a jednoduchosti fragmenty kódu obsahují fiktivní hesla databáze explicitně vložená, ale v ideálním případě byste měli používat tajné kódy Azure Databricks.
Nejprve vytvoříme a připojíme požadovaný konektor SQL a knihovny konektorů Azure Cosmos DB k našemu clusteru Azure Databricks. Restartujte cluster, aby se zajistilo načtení knihoven.
Dále prezentujeme dvě ukázky pro Scala a Python.
Scala
Tady získáme výsledky dotazu SQL s výstupem FOR JSON do datového rámce:
// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
Dále se připojíme k databázi a kolekci Azure Cosmos DB:
// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)
Nakonec definujeme schéma a použijeme from_json k použití datového rámce před uložením do kolekce Cosmos DB.
// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Python
Jako alternativní přístup možná budete muset ve Sparku spouštět transformace JSON, pokud zdrojová databáze nepodporuje FOR JSON
nebo podobnou operaci. Alternativně můžete použít paralelní operace pro velkou datovou sadu. Zde prezentujeme ukázku PySpark. Začněte konfigurací připojení ke zdrojové a cílové databázi v první buňce:
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
"Endpoint": "Endpoint",
"Masterkey": "Masterkey",
"Database": "OrdersDB",
"Collection": "Orders",
"Upsert": "true"
}
Potom se dotazujeme na zdrojová databáze (v tomto případě SQL Server) pro záznamy objednávek i podrobností objednávek a výsledky vložíme do datových rámců Sparku. Vytvoříme také seznam obsahující všechna ID objednávek a fond vláken pro paralelní operace:
import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
#get all OrderId values to pass to map function
orderids = orders.select('OrderId').collect()
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)
Pak vytvořte funkci pro zápis objednávek do cílové kolekce API pro NoSQL. Tato funkce vyfiltruje všechny podrobnosti objednávky pro dané ID objednávky, převede je na pole JSON a vloží pole do dokumentu JSON. Dokument JSON se pak zapíše do cílového kontejneru API pro NoSQL pro danou objednávku:
def writeOrder(orderid):
#filter the order on current value passed from map function
order = orders.filter(orders['OrderId'] == orderid[0])
#set id to be a uuid
order = order.withColumn("id", lit(str(uuid.uuid1())))
#add details field to order dataframe
order = order.withColumn("details", lit(''))
#filter order details dataframe to get details we want to merge into the order document
orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
#convert dataframe to pandas
orderpandas = order.toPandas()
#convert the order dataframe to json and remove enclosing brackets
orderjson = orderpandas.to_json(orient='records', force_ascii=False)
orderjson = orderjson[1:-1]
#convert orderjson to a dictionaory so we can set the details element with order details later
orderjsondata = json.loads(orderjson)
#convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
if (orderdetailsgroup.count() !=0):
#convert orderdetailsgroup to pandas dataframe to work better with json
orderdetailsgroup = orderdetailsgroup.toPandas()
#convert orderdetailsgroup to json string
jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
#convert jsonstring to dictionary to ensure correct encoding and no corrupt records
jsonstring = json.loads(jsonstring)
#set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order
orderjsondata['details'] = jsonstring
#convert dictionary to json
orderjsondata = json.dumps(orderjsondata)
#read the json into spark dataframe
df = spark.read.json(sc.parallelize([orderjsondata]))
#write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
#https://learn.microsoft.com/azure/cosmos-db/spark-connector
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()
Nakonec zavoláme funkci Pythonu writeOrder
pomocí mapové funkce ve fondu vláken, která se provede paralelně a předá seznam ID objednávek, které jsme vytvořili dříve:
#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)
V obou přístupech bychom na konci měli správně uložit vložené OrderDetails v rámci každého dokumentu objednávky v kolekci Azure Cosmos DB: