Kopírování dat ze služby Azure Cosmos DB do vyhrazeného fondu SQL pomocí Apache Sparku
Azure Synapse Link pro Azure Cosmos DB umožňuje uživatelům spouštět analýzy téměř v reálném čase přes provozní data ve službě Azure Cosmos DB. Existují ale chvíle, kdy je potřeba některá data agregovat a rozšířit, aby mohli obsluhovat uživatele datového skladu. Úpravy a export dat Azure Synapse Linku je možné provádět jenom s několika buňkami v poznámkovém bloku.
Požadavky
- Zřízení pracovního prostoru Synapse pomocí:
- Zřízení účtu služby Azure Cosmos DB pomocí kontejneru HTAP s daty
- Připojení kontejneru HTAP služby Azure Cosmos DB k pracovnímu prostoru
- Nastavení správného nastavení pro import dat do vyhrazeného fondu SQL ze Sparku
Kroky
V tomto kurzu se připojíte k analytickému úložišti, takže nebude mít žádný vliv na transakční úložiště (nebude spotřebovávat žádné jednotky žádostí). Provedeme následující kroky:
- Čtení kontejneru HTAP služby Azure Cosmos DB do datového rámce Sparku
- Agregace výsledků v novém datovém rámci
- Ingestování dat do vyhrazeného fondu SQL
Data
V tomto příkladu použijeme kontejner HTAP s názvem RetailSales. Je součástí propojené služby s názvem ConnectedData a má následující schéma:
- _rid: řetězec (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: řetězec (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- reklama: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: řetězec (nullable = true)
Agregujeme prodeje (množství, výnosy (cena x množství) podle productCode a weekStarting pro účely generování sestav. Nakonec tato data vyexportujeme do vyhrazené tabulky fondu SQL s názvem dbo.productsales
.
Konfigurace poznámkového bloku Sparku
Vytvořte poznámkový blok Spark s jazykem Scala jako Spark (Scala) jako hlavním jazykem. Pro relaci používáme výchozí nastavení poznámkového bloku.
Čtení dat ve Sparku
Načtěte kontejner HTAP služby Azure Cosmos DB se Sparkem do datového rámce v první buňce.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Agregace výsledků v novém datovém rámci
Ve druhé buňce spustíme transformaci a agregace potřebné pro nový datový rámec před načtením do vyhrazené databáze fondu SQL.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Načtení výsledků do vyhrazeného fondu SQL
Ve třetí buňce načteme data do vyhrazeného fondu SQL. Po dokončení úlohy automaticky vytvoří dočasnou externí tabulku, externí zdroj dat a formát externího souboru, který se odstraní.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Dotazování výsledků pomocí SQL
Výsledek můžete dotazovat pomocí jednoduchého dotazu SQL, jako je například následující skript SQL:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Dotaz zobrazí následující výsledky v režimu grafu: