Azure Cosmos DB-gegevens kopiëren naar een toegewezen SQL-pool, met behulp van Apache Spark
Met Azure Synapse Link voor Azure Cosmos DB kunnen gebruikers in bijna realtime analyses uitvoeren via operationele gegevens in Azure Cosmos DB. Er zijn echter momenten waarop sommige gegevens moeten worden geaggregeerd en verrijkt zodat datawarehouse-gebruikers ze kunnen gebruiken. Het cureren en exporteren van Azure Synapse Link-gegevens kan worden uitgevoerd met slechts een paar cellen in een notebook.
Vereisten
- Een Synapse-werkruimte inrichten met:
- Een Azure Cosmos DB-account inrichten met een HTAP-container met gegevens
- De HTAP-container van Azure Cosmos DB verbinden met de werkruimte
- De juiste instelling om vanuit Spark gegevens te importeren in een toegewezen SQL-pool
Stappen
In deze zelfstudie gaat u verbinding maken met de analytische opslag, zodat de transactionele opslag niet wordt beïnvloed (er worden geen aanvraageenheden gebruikt). De volgende stappen worden uitgevoerd:
- De Azure Cosmos DB HTAP-container lezen in een Spark-dataframe
- De resultaten aggregeren in een nieuwe dataframe
- De gegevens opnemen in een toegewezen SQL-pool
Gegevens
In dit voorbeeld wordt een HTAP-container met de naam RetailSales gebruikt. Deze maakt deel uit van een gekoppelde service met de naam ConnectedData en heeft het volgende schema:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
U gaat de verkoop (quantity, revenue (price x quantity)) aggregeren met productCode en weekStarting voor rapportagedoeleinden. Ten slotte exporteren we die gegevens naar een toegewezen SQL-pooltabel met de naam dbo.productsales
.
Een Apache Spark-notebook configureren
Maak een Apache Spark-notebook met Scala as Spark (Scala) als hoofdtaal. U gebruikt de standaardinstelling van de notebook voor deze sessie.
De gegevens in Apache Spark uitlezen
Lees de Azure Cosmos DB HTAP-container met Spark in een dataframe in de eerste cel.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
De resultaten aggregeren in een nieuwe dataframe
In de tweede cel voert u de transformatie en aggregaties uit die nodig zijn voor de nieuwe dataframe, vóórdat u deze in een toegewezen SQL-pooldatabase laadt.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
De resultaten laden in een toegewezen SQL-pool
In de derde cel laadt u de gegevens in een toegewezen SQL-pool. Er worden automatisch een tijdelijke externe tabel, externe gegevensbron en externe bestandsindeling gemaakt, die worden verwijderd zodra de taak is uitgevoerd.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Query's uitvoeren op de resultaten met SQL
U kunt query's uitvoeren op het resultaat met een eenvoudige SQL-query, zoals het volgende SQL-script:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Uw query geeft de volgende resultaten weer in een grafiekmodus: