使用 Azure Data Factory 中的 Spark 活動來轉換雲端中的資料
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
在本教學課程中,您會使用 Azure 入口網站來建立 Azure Data Factory 管線。 此管線使用 Spark 活動和隨選 Azure HDInsight 連結服務來轉換資料。
您會在本教學課程中執行下列步驟:
- 建立資料處理站。
- 建立使用 Spark 活動的管線。
- 觸發管線執行。
- 監視管道執行。
如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
必要條件
注意
建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 若要開始使用,請參閱安裝 Azure PowerShell (部分機器翻譯)。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az。
- Azure 儲存體帳戶。 您需要建立 Python 指令碼和輸入檔案,並上傳至 Azure 儲存體。 Spark 程式的輸出會儲存在這個儲存體帳戶中。 隨選 Spark 叢集與其主要儲存體是使用相同的儲存體帳戶。
注意
HdInsight 僅支援標準層的一般用途儲存體帳戶。 請確定帳戶不是進階或僅限 Blob 儲存體帳戶。
- Azure PowerShell。 遵循如何安裝並設定 Azure PowerShell 中的指示。
將 Python 指令碼上傳至 Blob 儲存體帳戶
使用下列內容建立名為 WordCount_Spark.py 的 Python 檔案:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
以您的 Azure 儲存體帳戶名稱取代 <storageAccountName>。 然後儲存檔案。
在 Azure Blob 儲存體中,建立名為 adftutorial 的容器 (如果不存在)。
建立名為 spark 的資料夾。
在 spark 資料夾下,建立名為 script 的子資料夾。
將 WordCount_Spark.py 檔案上傳至 script 子資料夾。
上傳輸入檔案
- 建立名為 minecraftstory.txt 的檔案並填入一些文字。 Spark 程式會計算這段文字中的字數。
- 在 spark 資料夾下,建立名為 inputfiles 的子資料夾。
- 將 minecraftstory.txt 檔案上傳至 inputfiles 子資料夾。
建立資料處理站
要是您還沒有使用資料處理站,請遵循快速入門:使用 Azure 入口網站建立資料處理站一文中的步驟,建立資料處理站。
建立連結服務
在本節中,您會撰寫兩個連結服務:
- 將 Azure 儲存體帳戶連結至資料處理站的 Azure 儲存體連結服務。 隨選 HDInsight 叢集會使用此儲存體。 而且也包含要執行的 Spark 指令碼。
- 隨選 HDInsight 連結服務。 Azure Data Factory 會自動建立 HDInsight 叢集並執行 Spark 程式。 然後在 HDInsight 叢集的閒置時間達到預先設定的時間後,系統就會刪除該叢集。
建立 Azure 儲存體連結服務
在首頁,切換至左側面板中的 [管理] 索引標籤。
選取視窗底部的 [連線],然後選取 [+ 新增]。
在 [新增連結服務] 視窗中,選取 [資料存放區]>[Azure Blob 儲存體],然後選取 [繼續]。
針對 [儲存體帳戶名稱],從清單中選取名稱,然後選取 [儲存]。
建立隨選 HDInsight 連結服務
再次選取 [+新增] 按鈕以建立另一個連結服務。
在 [新增連結服務] 視窗中,選取 [計算]>[Azure HDInsight],然後選取 [繼續]。
在 [新增連結服務] 視窗中,完成下列步驟:
a. 針對 [名稱],輸入 AzureHDInsightLinkedService。
b. 針對 [類型],確認已選取 [隨選 HDInsight]。
c. 針對 [Azure 儲存體連結服務],選取 [AzureBlobStorage1]。 您之前已建立此連結服務。 如果您使用不同的名稱,請在此指定正確的名稱。
d. 針對 [叢集類型],選取 [spark]。
e. 針對 [服務主體識別碼],輸入有權建立 HDInsight 叢集的服務主體識別碼。
此服務主體必須是訂用帳戶的參與者角色成員,或建立叢集所在的資源群組成員。 如需詳細資訊,請參閱建立 Microsoft Entra 應用程式和服務主體。 服務主體識別碼相當於「應用程式識別碼」,而服務主體金鑰則相當於「用戶端密碼」的值。
f. 針對 [服務主體金鑰],輸入金鑰。
.g 針對 [資源群組],選取建立資料處理站時使用的相同資源群組。 將在此資源群組中建立 Spark 叢集。
h. 展開 [OS 類型]。
i. 針對叢集使用者名稱輸入名稱。
j. 輸入使用者的叢集密碼。
k. 選取 [完成]。
注意
Azure HDInsight 會限制您在其支援的每個 Azure 區域中可使用的核心總數。 對於隨選 HDInsight 連結服務,用來建立 HDInsight 叢集的 Azure 儲存體位置與用來作為其主要儲存體的位置相同。 請確定您有足夠的核心配額,才能成功建立叢集。 如需詳細資訊,請參閱使用 Hadoop、Spark 及 Kafka 等在 HDInsight 中設定叢集。
建立新管線
選取 + (加號) 按鈕,然後選取功能表上的 [管線]。
在 [活動] 工具箱中,展開 [HDInsight]。 將 [活動] 工具箱中的 [Spark] 活動拖到管線設計工具介面。
在 [Spark] 活動視窗底部的屬性中,完成下列步驟:
a. 切換至 [HDI 叢集] 索引標籤。
b. 選取您在上一個程序中建立的 AzureHDInsightLinkedService。
切換至 [指令碼/Jar] 索引標籤並完成下列步驟:
a. 針對 [作業連結服務],選取 [AzureBlobStorage1]。
b. 選取 [瀏覽儲存體]。
c. 瀏覽至 adftutorial/spark/script 資料夾,選取 WordCount_Spark.py,然後選取 [完成]。
若要驗證管線,選取工具列上的 [驗證] 按鈕。 選取 >> (右箭頭) 按鈕以關閉驗證視窗。
選取 [全部發佈]。 Data Factory UI 會將實體 (連結服務和管線) 發佈至 Azure Data Factory 服務。
觸發管線執行
選取工具列上的 [新增觸發程序],然後選取 [立即觸發]。
監視管道執行
切換至 [監視] 索引標籤。確認您看到管線執行。 建立 Spark 叢集需要約 20 分鐘。
定期選取 [重新整理] 以檢查管線執行的狀態。
若要檢視與管線執行相關聯的活動執行,請選取 [動作] 資料行中的 [檢視活動執行]。
您可以選取頂端的 [所有管線執行] 連結,切換回管線執行檢視。
驗證輸出
確認已在 adftutorial 容器的 spark/otuputfiles/wordcount 資料夾中建立輸出檔案。
這個檔案應該有輸入文字檔中的每個字組,以及字組在檔案中出現的次數。 例如:
(u'This', 1)
(u'a', 1)
(u'is', 1)
(u'test', 1)
(u'file', 1)
相關內容
此範例中的管線會使用 Spark 活動和隨選 HDInsight 連結服務來轉換資料。 您已了解如何︰
- 建立資料處理站。
- 建立使用 Spark 活動的管線。
- 觸發管線執行。
- 監視管道執行。
若要了解如何在虛擬網路中的 Azure HDInsight 叢集上執行 Hive 指令碼來轉換資料,請進入下一個教學課程: