Udostępnij za pośrednictwem


Kopiowanie danych z usługi Azure Cosmos DB do dedykowanej puli SQL przy użyciu platformy Apache Spark

Usługa Azure Synapse Link dla usługi Azure Cosmos DB umożliwia użytkownikom uruchamianie analizy niemal w czasie rzeczywistym na danych operacyjnych w usłudze Azure Cosmos DB. Czasami jednak niektóre dane muszą zostać zagregowane i wzbogacone w celu obsługi użytkowników magazynu danych. Curating and export Azure Synapse Link data can be done with just a few cell in a notebook (Curating and export Azure Synapse Link data can done with a just a few cell in a notebook).

Wymagania wstępne

Kroki

W tym samouczku połączysz się z magazynem analitycznym, aby nie mieć wpływu na magazyn transakcyjny (nie będzie zużywać żadnych jednostek żądań). Wykonamy następujące kroki:

  1. Odczytywanie kontenera HTAP usługi Azure Cosmos DB w ramce danych platformy Spark
  2. Agregowanie wyników w nowej ramce danych
  3. Pozyskiwanie danych do dedykowanej puli SQL

Spark to SQL Steps 1 (Krok 1)

Data

W tym przykładzie używamy kontenera HTAP o nazwie RetailSales. Jest to część połączonej usługi o nazwie ConnectedData i ma następujący schemat:

  • _rid: ciąg (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: ciąg (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: ciąg (dopuszczany do wartości null = true)
  • reklama: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: ciąg (nullable = true)

Zagregujemy sprzedaż (ilość, przychód (cena x ilość) według wartości productCode i weekStarting na potrzeby raportowania. Na koniec wyeksportujemy te dane do dedykowanej tabeli puli SQL o nazwie dbo.productsales.

Konfigurowanie notesu platformy Spark

Tworzenie notesu platformy Spark przy użyciu języka Scala jako platformy Spark (Scala) jako języka głównego. Używamy domyślnego ustawienia notesu dla sesji.

Odczytywanie danych na platformie Spark

Przeczytaj kontener HTAP usługi Azure Cosmos DB z platformą Spark w ramce danych w pierwszej komórce.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregowanie wyników w nowej ramce danych

W drugiej komórce uruchamiamy transformację i agregujemy wymagane dla nowej ramki danych przed załadowaniem jej do dedykowanej bazy danych puli SQL.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Ładowanie wyników do dedykowanej puli SQL

W trzeciej komórce załadujemy dane do dedykowanej puli SQL. Spowoduje to automatyczne utworzenie tymczasowej tabeli zewnętrznej, zewnętrznego źródła danych i formatu pliku zewnętrznego, który zostanie usunięty po zakończeniu zadania.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Wykonywanie zapytań względem wyników przy użyciu języka SQL

Możesz wykonać zapytanie względem wyniku przy użyciu prostego zapytania SQL, takiego jak następujący skrypt SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Zapytanie przedstawi następujące wyniki w trybie wykresu: Spark to SQL Steps 2

Następne kroki