Kopiowanie danych z usługi Azure Cosmos DB do dedykowanej puli SQL przy użyciu platformy Apache Spark
Usługa Azure Synapse Link dla usługi Azure Cosmos DB umożliwia użytkownikom uruchamianie analizy niemal w czasie rzeczywistym na danych operacyjnych w usłudze Azure Cosmos DB. Czasami jednak niektóre dane muszą zostać zagregowane i wzbogacone w celu obsługi użytkowników magazynu danych. Curating and export Azure Synapse Link data can be done with just a few cell in a notebook (Curating and export Azure Synapse Link data can done with a just a few cell in a notebook).
Wymagania wstępne
- Aprowizuj obszar roboczy usługi Synapse za pomocą:
- Aprowizuj konto usługi Azure Cosmos DB przy użyciu kontenera HTAP z danymi
- Łączenie kontenera HTAP usługi Azure Cosmos DB z obszarem roboczym
- Mieć właściwą konfigurację importowania danych do dedykowanej puli SQL z platformy Spark
Kroki
W tym samouczku połączysz się z magazynem analitycznym, aby nie mieć wpływu na magazyn transakcyjny (nie będzie zużywać żadnych jednostek żądań). Wykonamy następujące kroki:
- Odczytywanie kontenera HTAP usługi Azure Cosmos DB w ramce danych platformy Spark
- Agregowanie wyników w nowej ramce danych
- Pozyskiwanie danych do dedykowanej puli SQL
Data
W tym przykładzie używamy kontenera HTAP o nazwie RetailSales. Jest to część połączonej usługi o nazwie ConnectedData i ma następujący schemat:
- _rid: ciąg (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: ciąg (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: ciąg (dopuszczany do wartości null = true)
- reklama: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: ciąg (nullable = true)
Zagregujemy sprzedaż (ilość, przychód (cena x ilość) według wartości productCode i weekStarting na potrzeby raportowania. Na koniec wyeksportujemy te dane do dedykowanej tabeli puli SQL o nazwie dbo.productsales
.
Konfigurowanie notesu platformy Spark
Tworzenie notesu platformy Spark przy użyciu języka Scala jako platformy Spark (Scala) jako języka głównego. Używamy domyślnego ustawienia notesu dla sesji.
Odczytywanie danych na platformie Spark
Przeczytaj kontener HTAP usługi Azure Cosmos DB z platformą Spark w ramce danych w pierwszej komórce.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Agregowanie wyników w nowej ramce danych
W drugiej komórce uruchamiamy transformację i agregujemy wymagane dla nowej ramki danych przed załadowaniem jej do dedykowanej bazy danych puli SQL.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Ładowanie wyników do dedykowanej puli SQL
W trzeciej komórce załadujemy dane do dedykowanej puli SQL. Spowoduje to automatyczne utworzenie tymczasowej tabeli zewnętrznej, zewnętrznego źródła danych i formatu pliku zewnętrznego, który zostanie usunięty po zakończeniu zadania.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Wykonywanie zapytań względem wyników przy użyciu języka SQL
Możesz wykonać zapytanie względem wyniku przy użyciu prostego zapytania SQL, takiego jak następujący skrypt SQL:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Zapytanie przedstawi następujące wyniki w trybie wykresu: