Copiare dati da Azure Cosmos DB in un pool SQL dedicato con Apache Spark
Collegamento ad Azure Synapse per Azure Cosmos DB consente agli utenti di eseguire analisi quasi in tempo reale su dati operativi in Azure Cosmos DB. Talvolta, tuttavia, è necessario aggregare e arricchire alcuni dati per gli utenti dei data warehouse. Per curare ed esportare i dati di Collegamento ad Azure Synapse sono sufficienti poche celle in un notebook.
Prerequisiti
- Effettuare il provisioning di un'area di lavoro di Synapse con:
- Effettuare il provisioning di un account Azure Cosmos DB con un contenitore HTAP con dati
- Connettere il contenitore HTAP di Azure Cosmos DB all'area di lavoro
- Avere la configurazione corretta per importare i dati da Spark in un pool SQL dedicato
Passaggi
In questa esercitazione si eseguirà la connessione all'archivio analitico, in modo da evitare qualsiasi impatto sull'archivio transazionale. Non verranno utilizzate unità richiesta. Si eseguiranno i passaggi seguenti:
- Leggere il contenitore HTAP di Azure Cosmos DB in un dataframe Spark
- Aggregare i risultati in un nuovo dataframe
- Inserire i dati in un pool SQL dedicato
Dati
Nell'esempio si usa un contenitore HTAP denominato RetailSales, che fa parte di un servizio collegato denominato ConnectedData e presenta lo schema seguente:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
Si aggregheranno le vendite (quantity, revenue (price x quantity) per productCode e weekStarting a scopo di report. Infine si esporteranno i dati in una tabella del pool SQL dedicato denominata dbo.productsales
.
Configurare un notebook Spark
Creare un notebook Spark con Scala (Scala as Spark) come linguaggio principale. Per la sessione si usa l'impostazione predefinita del notebook.
Leggere i dati in Spark
Nella prima cella leggere il contenitore HTAP di Azure Cosmos DB con Spark in un dataframe.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Aggregare i risultati in un nuovo dataframe
Nella seconda cella si eseguono la trasformazione e le aggregazioni necessarie per il nuovo dataframe prima di caricarlo in un database del pool SQL dedicato.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Caricare i risultati in un pool SQL dedicato
Nella terza cella si caricano i dati in un pool SQL dedicato. Verranno creati automaticamente una tabella esterna, un'origine dati esterna e un formato di file esterno temporanei che verranno eliminati al termine del processo.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Eseguire query sui risultati con SQL
È possibile eseguire query sul risultato usando una semplice query SQL, ad esempio lo script SQL seguente:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
La query presenterà i risultati seguenti in modalità grafico: