Migrowanie danych relacyjnych jeden do kilku do konta usługi Azure Cosmos DB for NoSQL
DOTYCZY: NoSQL
Aby przeprowadzić migrację z relacyjnej bazy danych do usługi Azure Cosmos DB for NoSQL, konieczne może być wprowadzenie zmian w modelu danych na potrzeby optymalizacji.
Jedną z typowych przekształceń jest denormalizowanie danych przez osadzanie powiązanych podwitami w jednym dokumencie JSON. W tym miejscu przyjrzymy się kilku opcjom korzystania z usługi Azure Data Factory lub Azure Databricks. Aby uzyskać więcej informacji na temat modelowania danych dla usługi Azure Cosmos DB, zobacz Modelowanie danych w usłudze Azure Cosmos DB.
Przykładowy scenariusz
Załóżmy, że mamy dwie poniższe tabele w bazie danych SQL Database, Orders i OrderDetails.
Chcemy połączyć tę relację jeden do kilku w jeden dokument JSON podczas migracji. Aby utworzyć pojedynczy dokument, utwórz zapytanie T-SQL przy użyciu polecenia FOR JSON
:
SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
Wyniki tego zapytania obejmują dane z tabeli Orders ( Zamówienia):
W idealnym przypadku chcesz użyć pojedynczego działania kopiowania usługi Azure Data Factory (ADF), aby wysyłać zapytania do danych SQL jako źródła i zapisywać dane wyjściowe bezpośrednio w ujściu usługi Azure Cosmos DB jako odpowiednie obiekty JSON. Obecnie nie można wykonać wymaganej transformacji JSON w jednym działaniu kopiowania. Jeśli spróbujemy skopiować wyniki powyższego zapytania do kontenera usługi Azure Cosmos DB for NoSQL, zobaczymy pole OrderDetails jako właściwość string naszego dokumentu zamiast oczekiwanej tablicy JSON.
Możemy obejść to bieżące ograniczenie w jeden z następujących sposobów:
- Użyj usługi Azure Data Factory z dwoma działaniami kopiowania:
- Pobieranie danych sformatowanych w formacie JSON z bazy danych SQL do pliku tekstowego w pośredniej lokalizacji magazynu obiektów blob
- Ładowanie danych z pliku tekstowego JSON do kontenera w usłudze Azure Cosmos DB.
- Użyj usługi Azure Databricks do odczytu z bazy danych SQL i zapisu w usłudze Azure Cosmos DB — przedstawimy tutaj dwie opcje.
Przyjrzyjmy się tym metodom bardziej szczegółowo:
Azure Data Factory
Chociaż nie możemy osadzić elementu OrderDetails jako tablicy JSON w docelowym dokumencie usługi Azure Cosmos DB, możemy obejść ten problem przy użyciu dwóch oddzielnych działań kopiowania.
Działanie kopiowania nr 1: SqlJsonToBlobText
W przypadku danych źródłowych użyjemy zapytania SQL, aby uzyskać zestaw wyników jako pojedynczą kolumnę z jednym obiektem JSON (reprezentującym kolejność) na wiersz przy użyciu funkcji SQL Server OPENJSON i FOR JSON PATH:
SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
W przypadku ujścia SqlJsonToBlobText
działania kopiowania wybieramy pozycję "Rozdzielany tekst" i wskazujemy go na określony folder w usłudze Azure Blob Storage. Ten ujście zawiera dynamicznie wygenerowaną unikatową nazwę pliku (na przykład @concat(pipeline().RunId,'.json')
.
Ponieważ nasz plik tekstowy nie jest naprawdę "rozdzielany" i nie chcemy, aby był analizowany w osobnych kolumnach przy użyciu przecinków. Chcemy również zachować podwójne cudzysłowy ("), ustawić ogranicznik kolumny na tabulator ("\t") — lub inny znak, który nie występuje w danych, a następnie ustaw wartość "Znak cudzysłowu" na "Brak znaku cudzysłowu".
Działanie kopiowania nr 2: BlobJsonToCosmos
Następnie zmodyfikujemy potok usługi ADF, dodając drugie działanie kopiowania, które wyszukuje w usłudze Azure Blob Storage dla pliku tekstowego utworzonego przez pierwsze działanie. Przetwarza go jako źródło "JSON", aby wstawić go do ujścia usługi Azure Cosmos DB jako jeden dokument na wiersz JSON znaleziony w pliku tekstowym.
Opcjonalnie do potoku dodamy również działanie "Usuń", aby usuwać wszystkie poprzednie pliki pozostałe w folderze /Orders/ przed każdym uruchomieniem. Potok usługi ADF wygląda teraz mniej więcej tak:
Po wyzwoleniu wspomnianego wcześniej potoku zobaczymy plik utworzony w lokalizacji pośredniej usługi Azure Blob Storage zawierający jeden obiekt JSON na wiersz:
Widzimy również dokumenty Orders z poprawnie osadzonymi elementami OrderDetails wstawianymi do kolekcji usługi Azure Cosmos DB:
Azure Databricks
Możemy również użyć platformy Spark w usłudze Azure Databricks , aby skopiować dane ze źródła usługi SQL Database do miejsca docelowego usługi Azure Cosmos DB bez tworzenia pośredniczących plików tekstowych/JSON w usłudze Azure Blob Storage.
Uwaga
Aby uzyskać czytelność i prostotę, fragmenty kodu zawierają jawnie wbudowane fikcyjne hasła bazy danych, ale najlepiej używać wpisów tajnych usługi Azure Databricks.
Najpierw utworzymy i dołączymy wymagane biblioteki łącznika SQL i łącznika usługi Azure Cosmos DB do klastra usługi Azure Databricks. Uruchom ponownie klaster, aby upewnić się, że biblioteki zostały załadowane.
Następnie przedstawimy dwa przykłady dla języków Scala i Python.
Scala
W tym miejscu uzyskujemy wyniki zapytania SQL z danymi wyjściowymi "FOR JSON" do ramki danych:
// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
Następnie nawiążmy połączenie z naszą bazą danych i kolekcją usługi Azure Cosmos DB:
// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)
Na koniec definiujemy nasz schemat i używamy from_json do zastosowania ramki danych przed zapisaniem go w kolekcji usługi Cosmos DB.
// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Python
Alternatywną metodą może być wykonanie przekształceń JSON na platformie Spark, jeśli źródłowa baza danych nie obsługuje FOR JSON
lub podobna operacja. Alternatywnie można użyć operacji równoległych dla dużego zestawu danych. W tym miejscu przedstawiamy przykład PySpark. Zacznij od skonfigurowania źródłowych i docelowych połączeń bazy danych w pierwszej komórce:
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
"Endpoint": "Endpoint",
"Masterkey": "Masterkey",
"Database": "OrdersDB",
"Collection": "Orders",
"Upsert": "true"
}
Następnie wysyłamy zapytanie do źródłowej bazy danych (w tym przypadku programu SQL Server) zarówno dla rekordów szczegółów zamówienia, jak i zamówienia, umieszczając wyniki w ramkach danych platformy Spark. Tworzymy również listę zawierającą wszystkie identyfikatory zamówień i pulę wątków dla operacji równoległych:
import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
#get all OrderId values to pass to map function
orderids = orders.select('OrderId').collect()
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)
Następnie utwórz funkcję do zapisywania zamówień w docelowym interfejsie API dla kolekcji NoSQL. Ta funkcja filtruje wszystkie szczegóły zamówienia dla danego identyfikatora zamówienia, konwertuje je na tablicę JSON i wstawia tablicę do dokumentu JSON. Dokument JSON jest następnie zapisywany w docelowym interfejsie API dla kontenera NoSQL dla tej kolejności:
def writeOrder(orderid):
#filter the order on current value passed from map function
order = orders.filter(orders['OrderId'] == orderid[0])
#set id to be a uuid
order = order.withColumn("id", lit(str(uuid.uuid1())))
#add details field to order dataframe
order = order.withColumn("details", lit(''))
#filter order details dataframe to get details we want to merge into the order document
orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
#convert dataframe to pandas
orderpandas = order.toPandas()
#convert the order dataframe to json and remove enclosing brackets
orderjson = orderpandas.to_json(orient='records', force_ascii=False)
orderjson = orderjson[1:-1]
#convert orderjson to a dictionaory so we can set the details element with order details later
orderjsondata = json.loads(orderjson)
#convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
if (orderdetailsgroup.count() !=0):
#convert orderdetailsgroup to pandas dataframe to work better with json
orderdetailsgroup = orderdetailsgroup.toPandas()
#convert orderdetailsgroup to json string
jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
#convert jsonstring to dictionary to ensure correct encoding and no corrupt records
jsonstring = json.loads(jsonstring)
#set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order
orderjsondata['details'] = jsonstring
#convert dictionary to json
orderjsondata = json.dumps(orderjsondata)
#read the json into spark dataframe
df = spark.read.json(sc.parallelize([orderjsondata]))
#write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
#https://learn.microsoft.com/azure/cosmos-db/spark-connector
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()
Na koniec wywołujemy funkcję języka Python writeOrder
przy użyciu funkcji mapy w puli wątków, aby wykonać równolegle, przekazując listę utworzonych wcześniej identyfikatorów zamówień:
#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)
W obu metodach na końcu powinniśmy prawidłowo zapisać osadzony element OrderDetails w ramach każdego dokumentu Order w kolekcji usługi Azure Cosmos DB:
Następne kroki
- Dowiedz się więcej o modelowaniu danych w usłudze Azure Cosmos DB
- Dowiedz się , jak modelować i partycjonować dane w usłudze Azure Cosmos DB