練習 - 將筆記本整合到 Azure Synapse 管線內
在此單元中,您將建立 Azure Synapse Spark 筆記本,以分析和轉換由對應資料流載入的資料,並將資料儲存在資料湖中。 您將建立參數資料格來接受字串參數,以針對筆記本寫入資料湖的資料,定義資料夾名稱。
然後,您會將此筆記本新增至 Synapse 管線,並將唯一的管線執行識別碼傳遞至筆記本參數,讓您稍後可以將管線執行與筆記本活動所儲存的資料相互關聯。
最後,您會使用 Synapse Studio 中的 [監視]中樞來監視管線執行、取得執行識別碼,然後找出儲存在資料湖中的對應檔案。
關於 Apache Spark 和筆記本
Apache Spark 是一個平行處理架構,可支援記憶體內部處理,以大幅提升巨量資料分析應用程式的效能。 Azure Synapse Analytics 中的 Apache Spark 是 Microsoft 在雲端中的其中一種 Apache Spark 實作。
Synapse Studio 中的 Apache Spark 筆記本是 Web 介面,可讓您建立包含即時程式碼、視覺效果和敘述文的檔案。 筆記本是驗證想法和使用快速實驗從您的資料取得見解的絕佳位置。 筆記本也廣泛用於資料準備、資料視覺效果、機器學習和其他巨量資料案例中。
建立 Synapse Spark 筆記本
假設您在 Synapse Analytics 中建立對應資料流,以處理、聯結和匯入使用者設定檔資料。 現在,您想根據哪些為喜愛和首選,且過去 12 個月內購買最多的產品,找出每個使用者的前 5 名產品。 然後,您想要計算全體產品的前 5 名。
在此練習中,您建立 Synapse Spark 筆記本來進行這些計算。
開啟 Synapse Analytics Studio (https://web.azuresynapse.net/),然後移至 [資料]中樞。
選取 [已連結]索引標籤 (1),然後展開 [Azure Data Lake Storage Gen2] 下方的主要資料湖儲存體帳戶 (2)。 選取 [wwi-02]容器 (3),然後開啟 [top-products]資料夾 (4)。 以滑鼠右鍵按一下任何 Parquet 檔案 (5),選取 [新增筆記本]功能表項目 (6),然後選取 [載入至 DataFrame] (7)。 如果您沒看到該資料夾,請選取
Refresh
。請確定筆記本已附加至 Spark 集區。
將 Parquet 檔案名稱換成
*.parquet
(1),以選取top-products
資料夾中的所有 Parquet 檔案。 例如,路徑應該類似於:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
。在筆記本工具列上,選取 [全部執行]以執行筆記本。
注意
當您第一次在 Spark 集區中執行筆記本時,Synapse 會建立新的工作階段。 這可能需要大約 3-5 分鐘的時間。
注意
若只要執行資料格,請將滑鼠游標暫留在資料格上,並選取資料格左側的 [執行資料格]圖示,或是選取資料格,然後在鍵盤上輸入 Ctrl+Enter。
選取 + 按鈕,然後選取 [程式碼資料格]項目,在下方建立新的資料格。 [+] 按鈕位於左側筆記本資料格下方。 或者,您也可以展開筆記本工具列中的 [+ 資料格]功能表,然後選取 [程式碼資料格]項目。
在新的資料格中執行下列命令,以填入新的 dataframe (名為
topPurchases
)、建立新的暫時檢視 (名為top_purchases
),以及顯示前 100 個資料列:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
輸出應如下所示:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
在新的資料格中,使用 SQL 執行下列命令,以建立新的暫時檢視:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
注意
此查詢沒有輸出。
此查詢使用
top_purchases
暫時檢視作為來源,並套用row_number() over
方法,以針對每位使用者找出ItemsPurchasedLast12Months
最大的記錄,然後請求資料列編號。where
子句會篩選結果,因此,我們最多只取出IsTopProduct
和IsPreferredProduct
都設為 true 的五個產品。 這讓我們知道每位使用者購買最多的前五名產品,而根據儲存在 Azure Cosmos DB 中的使用者設定檔,這些產品「也」識別為喜愛的產品。在新的資料格中執行下列命令,來建立並顯示新的 DataFrame,以儲存您在上一個資料格中建立的
top_5_products
暫時檢視的結果:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
您應該會看到類似下面的輸出,其中顯示每位使用者喜愛的前五名產品:
根據客戶喜愛且購買最多的產品,計算全體產品的前五名。 若要這麼做,請在新資料格中執行下列命令:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
在此資料格中,我們依產品識別碼將前五名喜愛的產品分組,加總過去 12 個月內購買的項目總數,以遞減順序排序該值,然後傳回前五名結果。 您的輸出應如下所示:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
建立參數資料格
Azure Synapse 管線會尋找參數資料格,並將此資料格視為執行階段傳入之參數的預設值。 執行引擎會在參數資料格下方新增資料格,且帶有輸入參數來覆寫預設值。 如果未指定參數資料格,則插入的資料格會插入到筆記本的頂端。
我們將從管線執行此筆記本。 我們想要傳入參數來設定
runId
變數值,以用來命名 Parquet 檔案。 在新資料格中執行下列命令:import uuid # Generate random GUID runId = uuid.uuid4()
我們使用
uuid
Spark 隨附的程式庫來產生隨機的 GUID。 我們想要以管線傳入的參數來覆寫runId
變數。 若要這樣做,我們必須將此資料格切換為參數資料格。選取資料格右上角的動作省略符號 (...)(1),然後選取 [切換參數資料格](2)。
切換此選項之後,您在資料格上會看到 [參數]標記。
將下列程式碼貼在新的資料格中,以便在主要資料湖帳戶的
runId
路徑中,使用/top5-products/
變數當作 Parquet 檔案名稱。 在路徑中,以主要資料湖帳戶的名稱取代YOUR_DATALAKE_NAME
。 若要找出此變數,請向上捲動至頁面頂端的 [資料格 1](1)。 從路徑複製資料湖儲存體帳戶 (2)。 在新的資料格內,貼上此值取代路徑中的YOUR_DATALAKE_NAME
(3),然執行資料格中的命令。%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
確認檔案已寫入資料湖。 移至 [資料]中樞,然後選取 [已連結]索引標籤 (1)。 展開主要資料湖儲存體帳戶,然後選取 [wwi-02] 容器 (2)。 移至 [top5-products]資料夾 (3)。 您應該會在目錄中看到 Parquet 檔案的資料夾,檔案名稱是 GUID (4)。
因為先前不存在此目錄,所以在筆記本資料格中,dataframe 上的 Parquet 寫入方法就建立了此目錄。
將筆記本新增至 Synapse 管線
回到我們在練習開頭所述的對應資料流,假設在協調流程中執行資料流程之後,您想要執行此筆記本。 若要這樣做,請將此筆記本新增至管線,成為新的筆記本活動。
返回筆記本。 選取筆記本右上角的 [屬性](1),然後在 [名稱]
Calculate Top 5 Products
中輸入 。選取筆記本右上角的 [新增至管線](1),然後選取 [現有管線] (2)。
選取 [將使用者設定檔資料寫入 ASA]管線 (1),然後選取 [新增] *(2)。
Synapse Studio 會將筆記本活動新增至管線。 將 [筆記本] 活動重新排列在 [資料流程] 活動的右邊。 選取 [資料流程] 活動,然後將 [成功]活動管線連線綠色方塊,拖曳至 [筆記本] 活動。
[成功] 活動箭號指示管線在資料流程活動成功執行之後,執行筆記本活動。
選取 [筆記本] 活動 (1),然後選取 [設定]索引標籤 (2)、展開 [基底參數](3),再選取 [+ 新增] (4)。 在 [名稱]
runId
欄位 (5) 中輸入 。 在 [類型](6) 中選取 [字串]。 在 [值]中,選取 [新增動態內容] (7)。在 [系統變數] (1) 下方,選取 [管線執行識別碼]。 這會將
@pipeline().RunId
新增至動態內容方塊 (2)。 選取 [完成] (3) 以關閉對話方塊。管線執行識別碼值是指派給每個管線執行的唯一 GUID。 我們會將此值當成
runId
筆記本參數傳入,以作為 Parquet 檔案的名稱。 接著,我們可以翻閱管線執行歷程記錄,找出每次管線執行所建立的特定 Parquet 檔案。選取 [全部發佈],然後按一下 [發佈] 以儲存變更。
發佈完成後,選取 [新增觸發程序] (1),然後選取 [立即觸發](2),以執行更新的管線。
選取 [確定]以執行觸發程序。
監視管道執行
[監視] 中樞可讓您監視 SQL、Apache Spark 和 Pipelines 的目前和歷史活動。
前往 [監視] 中樞。
選取 [管線執行] (1),然後等候管線執行成功完成 (2)。 您可能需要重新整理 (3) 檢視。
選取管線的名稱,以檢視管線的活動執行。
請注意 [資料流程]活動和新的 [筆記本]活動 (1)。 記下 [管線執行識別碼]值 (2)。 我們會比較此值與筆記本所產生的 Parquet 檔案名稱。 選取 [計算前 5 名產品]筆記本名稱,以檢視其詳細資料 (3)。
在這裡,我們看到筆記本執行詳細資料。 您可以選取 [播放](1),以觀看作業 (2) 進度的播放。 在底部,您可以使用不同的篩選選項 (3),以檢視 [診斷]和 [記錄]。 在右側,我們可以檢視執行詳細資料,例如持續時間、Livy 識別碼、Spark 集區詳細資料等等。 選取作業上的 [檢視詳細資料] 連結,以檢視其詳細資料 (5)。
Spark 應用程式 UI 會在新的索引標籤中開啟,讓我們查看階段詳細資料。 展開 [DAG 視覺效果],以檢視階段詳細資料。
返回 [資料] 中樞。
選取 [已連結]索引標籤 (1),然後選取主要資料湖儲存體帳戶上的 [wwi-02]容器 (2)、移至 [top5-products]資料夾 (3),然後針對名稱符合管線執行識別碼的 Parquet 檔案,確認有資料夾存在。
如您所見,有一個檔案的名稱符合我們稍早記下的管線執行識別碼:
因為我們已將管線執行識別碼傳給筆記本活動的
runId
參數,所以這些值相符。