Sdílet prostřednictvím


Kopírování dat ze služby Azure Cosmos DB do vyhrazeného fondu SQL pomocí Apache Sparku

Azure Synapse Link pro Azure Cosmos DB umožňuje uživatelům spouštět analýzy téměř v reálném čase přes provozní data ve službě Azure Cosmos DB. Existují ale chvíle, kdy je potřeba některá data agregovat a rozšířit, aby mohli obsluhovat uživatele datového skladu. Úpravy a export dat Azure Synapse Linku je možné provádět jenom s několika buňkami v poznámkovém bloku.

Požadavky

Kroky

V tomto kurzu se připojíte k analytickému úložišti, takže nebude mít žádný vliv na transakční úložiště (nebude spotřebovávat žádné jednotky žádostí). Provedeme následující kroky:

  1. Čtení kontejneru HTAP služby Azure Cosmos DB do datového rámce Sparku
  2. Agregace výsledků v novém datovém rámci
  3. Ingestování dat do vyhrazeného fondu SQL

Spark do SQL – kroky 1

Data

V tomto příkladu použijeme kontejner HTAP s názvem RetailSales. Je součástí propojené služby s názvem ConnectedData a má následující schéma:

  • _rid: řetězec (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: řetězec (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • reklama: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: řetězec (nullable = true)

Agregujeme prodeje (množství, výnosy (cena x množství) podle productCode a weekStarting pro účely generování sestav. Nakonec tato data vyexportujeme do vyhrazené tabulky fondu SQL s názvem dbo.productsales.

Konfigurace poznámkového bloku Sparku

Vytvořte poznámkový blok Spark s jazykem Scala jako Spark (Scala) jako hlavním jazykem. Pro relaci používáme výchozí nastavení poznámkového bloku.

Čtení dat ve Sparku

Načtěte kontejner HTAP služby Azure Cosmos DB se Sparkem do datového rámce v první buňce.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregace výsledků v novém datovém rámci

Ve druhé buňce spustíme transformaci a agregace potřebné pro nový datový rámec před načtením do vyhrazené databáze fondu SQL.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Načtení výsledků do vyhrazeného fondu SQL

Ve třetí buňce načteme data do vyhrazeného fondu SQL. Po dokončení úlohy automaticky vytvoří dočasnou externí tabulku, externí zdroj dat a formát externího souboru, který se odstraní.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Dotazování výsledků pomocí SQL

Výsledek můžete dotazovat pomocí jednoduchého dotazu SQL, jako je například následující skript SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Dotaz zobrazí následující výsledky v režimu grafu: Spark do SQL – kroky 2

Další kroky