Delen via


Een-op-weinig relationele gegevens migreren naar een Azure Cosmos DB for NoSQL-account

VAN TOEPASSING OP: NoSQL

Als u wilt migreren van een relationele database naar Azure Cosmos DB for NoSQL, kan het nodig zijn om wijzigingen aan te brengen in het gegevensmodel voor optimalisatie.

Een veelvoorkomende transformatie is het denormaliseren van gegevens door gerelateerde subitems in te sluiten binnen één JSON-document. Hier kijken we naar een aantal opties hiervoor met behulp van Azure Data Factory of Azure Databricks. Zie gegevensmodellering in Azure Cosmos DB voor meer informatie over gegevensmodellering voor Azure Cosmos DB.

Voorbeeldscenario

Stel dat we de volgende twee tabellen hebben in onze SQL-database, Orders en OrderDetails.

Schermopname van de tabellen Orders en OrderDetails in de SQL-database.

We willen deze een-op-weinig-relatie combineren tot één JSON-document tijdens de migratie. Als u één document wilt maken, maakt u een T-SQL-query met behulp van FOR JSON:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

De resultaten van deze query bevatten gegevens uit de tabel Orders :

Schermopname van een query die resulteert in details van verschillende orders.

In het ideale voorbeeld wilt u één ADF-kopieeractiviteit (Azure Data Factory) gebruiken om SQL-gegevens als bron op te vragen en de uitvoer rechtstreeks naar de Azure Cosmos DB-sink te schrijven als de juiste JSON-objecten. Momenteel is het niet mogelijk om de benodigde JSON-transformatie uit te voeren in één kopieeractiviteit. Als we de resultaten van de bovenstaande query proberen te kopiëren naar een Azure Cosmos DB for NoSQL-container, zien we het veld OrderDetails als een tekenreekseigenschap van ons document in plaats van de verwachte JSON-matrix.

We kunnen deze huidige beperking omzeilen op een van de volgende manieren:

  • Azure Data Factory gebruiken met twee kopieeractiviteiten:
    1. JSON-geformatteerde gegevens ophalen van SQL naar een tekstbestand in een tussenliggende blobopslaglocatie
    2. Laad gegevens uit het JSON-tekstbestand naar een container in Azure Cosmos DB.
  • Gebruik Azure Databricks om te lezen van SQL en schrijven naar Azure Cosmos DB . We presenteren hier twee opties.

Laten we deze benaderingen nader bekijken:

Azure Data Factory

Hoewel we OrderDetails niet kunnen insluiten als een JSON-matrix in het Azure Cosmos DB-doeldocument, kunnen we het probleem omzeilen door twee afzonderlijke kopieeractiviteiten te gebruiken.

Kopieeractiviteit 1: SqlJsonToBlobText

Voor de brongegevens gebruiken we een SQL-query om de resultatenset op te halen als één kolom met één JSON-object (die de volgorde vertegenwoordigt) per rij met behulp van de MOGELIJKHEDEN VAN SQL Server OPENJSON en FOR JSON PATH:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

Schermopname van de voorbeeldwaarden in de ADF-kopieerbewerking.

Voor de sink van de SqlJsonToBlobText kopieeractiviteit kiezen we 'Tekst met scheidingstekens' en wijzen we deze naar een specifieke map in Azure Blob Storage. Deze sink bevat een dynamisch gegenereerde unieke bestandsnaam (bijvoorbeeld @concat(pipeline().RunId,'.json'). Omdat ons tekstbestand niet echt 'gescheiden' is en we niet willen dat het in afzonderlijke kolommen wordt geparseerd met komma's. We willen ook de dubbele aanhalingstekens () behouden, 'Kolomscheidingsteken' instellen op een tab (\t) of een ander teken dat niet in de gegevens voorkomt en 'Aanhalingsteken' instellen op 'Geen aanhalingsteken'.

Schermopname waarin de instellingen voor kolomscheidingstekens en aanhalingstekens zijn gemarkeerd.

Kopieeractiviteit 2: BlobJsonToCosmos

Vervolgens wijzigen we onze ADF-pijplijn door de tweede kopieeractiviteit toe te voegen die in Azure Blob Storage zoekt naar het tekstbestand dat door de eerste activiteit is gemaakt. Het verwerkt deze als JSON-bron om in te voegen in de Azure Cosmos DB-sink als één document per JSON-rij in het tekstbestand.

Schermopname waarin het JSON-bronbestand en de bestandspadvelden zijn gemarkeerd.

Desgewenst voegen we ook een 'Delete'-activiteit toe aan de pijplijn, zodat alle vorige bestanden die nog in de map /Orders/ staan, worden verwijderd voordat elke uitvoering wordt uitgevoerd. Onze ADF-pijplijn ziet er nu ongeveer als volgt uit:

Schermopname waarin de activiteit Verwijderen is gemarkeerd.

Nadat we de eerder genoemde pijplijn hebben geactiveerd, zien we een bestand dat is gemaakt in onze tussenliggende Azure Blob Storage-locatie met één JSON-object per rij:

Schermopname van het gemaakte bestand dat de JSON-objecten bevat.

We zien ook ordersdocumenten met correct ingesloten OrderDetails die zijn ingevoegd in onze Azure Cosmos DB-verzameling:

Schermopname van de ordergegevens als onderdeel van het Azure Cosmos DB-document

Azure Databricks

We kunnen Spark in Azure Databricks ook gebruiken om de gegevens van onze SQL Database-bron naar de Azure Cosmos DB-bestemming te kopiëren zonder de tussenliggende tekst-/JSON-bestanden in Azure Blob Storage te maken.

Notitie

Ter duidelijkheid en eenvoud bevatten de codefragmenten dummy-databasewachtwoorden expliciet inline, maar u moet in het ideale geval Azure Databricks-geheimen gebruiken.

Eerst maken en koppelen we de vereiste SQL-connector - en Azure Cosmos DB-connectorbibliotheken aan ons Azure Databricks-cluster. Start het cluster opnieuw op om ervoor te zorgen dat bibliotheken zijn geladen.

Schermopname van het maken en koppelen van de vereiste SQL-connector en Azure Cosmos DB-connectorbibliotheken aan ons Azure Databricks-cluster.

Vervolgens presenteren we twee voorbeelden voor Scala en Python.

Scala

Hier krijgen we de resultaten van de SQL-query met 'FOR JSON'-uitvoer in een DataFrame:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Schermopname van de SQL-queryuitvoer in een DataFrame.

Vervolgens maken we verbinding met onze Azure Cosmos DB-database en -verzameling:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

Ten slotte definiëren we ons schema en gebruiken we from_json om het DataFrame toe te passen voordat we het opslaan in de Cosmos DB-verzameling.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Schermopname waarin de juiste matrix is gemarkeerd voor het opslaan in een Azure Cosmos DB-verzameling.

Python

Als alternatief moet u mogelijk JSON-transformaties uitvoeren in Spark als de brondatabase geen ondersteuning biedt FOR JSON voor of een vergelijkbare bewerking. U kunt ook parallelle bewerkingen gebruiken voor een grote gegevensset. Hier presenteren we een PySpark-voorbeeld. Begin met het configureren van de bron- en doeldatabaseverbindingen in de eerste cel:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

Vervolgens voeren we een query uit op de brondatabase (in dit geval SQL Server) voor zowel de order- als orderdetailrecords, waarbij de resultaten in Spark Dataframes worden geplaatst. We maken ook een lijst met alle order-id's en een threadpool voor parallelle bewerkingen:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

Maak vervolgens een functie voor het schrijven van orders in de doel-API voor noSQL-verzameling. Met deze functie worden alle ordergegevens voor de opgegeven order-id gefilterd, geconverteerd naar een JSON-matrix en wordt de matrix ingevoegd in een JSON-document. Het JSON-document wordt vervolgens voor die volgorde naar de doel-API voor NoSQL-container geschreven:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://learn.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

Ten slotte roepen we de Python-functie writeOrder aan met behulp van een kaartfunctie in de thread-pool om parallel uit te voeren, waarbij de lijst met volgorde-id's wordt doorgegeven die we eerder hebben gemaakt:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

Aan het einde van beide benaderingen moeten we ingesloten orderdetails in elk orderdocument in de Azure Cosmos DB-verzameling op de juiste manier opslaan:

Schermopname van de resulterende gegevens na de migratie.

Volgende stappen