Kopiera data från Azure Cosmos DB till en dedikerad SQL-pool med Apache Spark
Med Azure Synapse Link för Azure Cosmos DB kan användarna köra analyser i nära realtid över driftdata i Azure Cosmos DB. Det finns dock tillfällen då vissa data måste aggregeras och utökas för att betjäna informationslageranvändare. Du kan kurera och exportera Azure Synapse Link-data med bara några få celler i en notebook-fil.
Förutsättningar
- Etablera en Synapse-arbetsyta med:
- Etablera ett Azure Cosmos DB-konto med en HTAP-container med data
- Ansluta Azure Cosmos DB HTAP-containern till arbetsytan
- Ha rätt konfiguration för att importera data till en dedikerad SQL-pool från Spark
Steg
I den här självstudien ansluter du till analysarkivet så att det inte påverkar transaktionslagret (det förbrukar inga enheter för begäran). Vi går igenom följande steg:
- Läs Azure Cosmos DB HTAP-containern i en Spark-dataram
- Aggregera resultatet i en ny dataram
- Mata in data i en dedikerad SQL-pool
Data
I det exemplet använder vi en HTAP-container med namnet RetailSales. Den ingår i en länkad tjänst med namnet ConnectedData och har följande schema:
- _rid: sträng (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: sträng (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- reklam: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: sträng (nullable = true)
Vi aggregerar försäljningen (kvantitet, intäkter (pris x kvantitet) efter productCode och weekStarting för rapporteringsändamål. Slutligen exporterar vi dessa data till en dedikerad SQL-pooltabell med namnet dbo.productsales
.
Konfigurera en Spark Notebook
Skapa en Spark-notebook-fil med Scala som Spark (Scala) som huvudspråk. Vi använder notebook-filens standardinställning för sessionen.
Läsa data i Spark
Läs Azure Cosmos DB HTAP-containern med Spark i en dataram i den första cellen.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Aggregera resultatet i en ny dataram
I den andra cellen kör vi den transformering och de aggregeringar som behövs för den nya dataramen innan den läses in i en dedikerad SQL-pooldatabas.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Läs in resultatet i en dedikerad SQL-pool
I den tredje cellen läser vi in data i en dedikerad SQL-pool. Den skapar automatiskt en tillfällig extern tabell, en extern datakälla och ett externt filformat som tas bort när jobbet är klart.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Köra frågor mot resultatet med SQL
Du kan köra frågor mot resultatet med hjälp av en enkel SQL-fråga, till exempel följande SQL-skript:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Frågan visar följande resultat i ett diagramläge: