Lakehouse 教學課程:準備和轉換 Lakehouse 中的資料
在本教學課程中,您會搭配 Spark 執行階段 使用筆記本,在 Lakehouse 中轉換和準備原始資料。
必要條件
如果您沒有包含資料的 Lakehouse,您必須:
準備資料
從先前的教學課程步驟中,我們已從來源擷取原始資料到 Lakehouse 的檔案區段。 現在您可以轉換該資料,並準備建立 Delta 資料表。
從 Lakehouse 教學課程原始碼 資料夾下載筆記本。
從位於畫面左下方的 Power BI 切換器中,選取資料工程。
從登陸頁面頂端的新增 區段選取匯入筆記本。
從畫面右側開啟的匯入狀態 窗格中選取上傳。
選取您在本節第一個步驟中下載的所有筆記本。
選取開啟。 指出匯入狀態的通知會出現在瀏覽器視窗右上角。
匯入成功之後,請移至工作區的項目檢視,並查看新匯入的筆記本。 選取 wwilakehouse lakehouse 以開啟它。
開啟 wwilakehouse Lakehouse 之後,請從頂端導覽功能表中選取開啟筆記本現有筆記本>。
從現有筆記本清單中,選取01 - 建立差異資料表 筆記本,然後選取 開啟。
在 Lakehouse Explorer 的開啟筆記本中,您會看到筆記本已連結至已開啟的 Lakehouse。
在 Lakehouse 的資料表 區段中將資料寫入為 Delta Lake Tables 之前,您會使用兩個網狀架構功能(V 順序和最佳化寫入)來將資料寫入最佳化,並改善讀取效能。 若要在您的工作階段中啟用這些功能,請在筆記本的第一個儲存格中組態這些設定。
若要啟動筆記本並依序執行所有儲存格,請選取頂端功能區上的全部執行 (首頁下方)。 或者,若要只從特定儲存格執行程式碼,請選取滑鼠暫留時出現在儲存格左邊的執行圖示,或在控制項位於儲存格中時按鍵盤上的 SHIFT + ENTER。
執行儲存格時,您不需要指定基礎 Spark 集區或叢集詳細資料,因為 Fabric 會透過即時集區提供它們。 每個網狀架構工作區都隨附預設 Spark 集區,稱為「即時集區」。 這表示當您建立筆記本時,不需要擔心指定任何 Spark 組態或叢集詳細資料。 當您執行第一個筆記本命令時,即時集區會在幾秒鐘內啟動並執行。 而且會建立 Spark 工作階段,並開始執行程式碼。 當 Spark 工作階段處於使用中狀態時,此筆記本中的後續程式碼執行幾乎瞬間完成。
接下來,您會從 Lakehouse 的檔案區段讀取原始資料,並在轉換中新增更多不同日期部分的資料行。 最後,您可以使用資料分割 By Spark API 將資料分割,然後再根據新建立的資料部分資料行 (Year 和 Quarter) 將它寫入為 Delta 資料表格式。
from pyspark.sql.functions import col, year, month, quarter table_name = 'fact_sale' df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/fact_sale_1y_full') df = df.withColumn('Year', year(col("InvoiceDateKey"))) df = df.withColumn('Quarter', quarter(col("InvoiceDateKey"))) df = df.withColumn('Month', month(col("InvoiceDateKey"))) df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
在事實資料表載入之後,您可以繼續載入其餘維度的資料。 下列儲存格會建立函式,從 Lakehouse 的檔案區段讀取作為參數傳遞之每個表格名稱的未經處理資料。 接下來,它會建立維度資料表的清單。 最後,它會迴圈查看資料表清單,併為從輸入參數讀取的每個資料表名稱建立 Delta 資料表。 請注意,腳本會卸除此範例中名為
Photo
的資料行,因為未使用資料行。from pyspark.sql.types import * def loadFullDataFromSource(table_name): df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/' + table_name) df = df.drop("Photo") df.write.mode("overwrite").format("delta").save("Tables/" + table_name) full_tables = [ 'dimension_city', 'dimension_customer', 'dimension_date', 'dimension_employee', 'dimension_stock_item' ] for table in full_tables: loadFullDataFromSource(table)
若要驗證已建立的資料表,請按滑鼠右鍵並選取 wwilakehouse Lakehouse 上的重新整理。 資料表隨即出現。
再次移至工作區的項目檢視,然後選取 wwilakehouse lakehouse 加以開啟。
現在,開啟第二個筆記本。 在 Lakehouse 檢視中,從功能區選取開啟筆記本現有的筆記本>。
從現有筆記本清單中,選取 02 - 資料轉換 - 商務 筆記本加以開啟。
在 Lakehouse Explorer 的開啟筆記本中,您會看到筆記本已連結至已開啟的 Lakehouse。
組織可能會有資料工程師使用 Scala/Python,以及其他使用 SQL 的資料工程師(Spark SQL 或 T-SQL),都處理相同的資料複本。 網狀架構可讓這些不同的群組使用不同的體驗和喜好設定,以工作和共同作業。 這兩種不同的方法會轉換併產生商務彙總。 您可以挑選適合您的方法,或根據您的喜好設定來混合和比對這些方法,而不會影響效能:
方法 #1 - 使用 PySpark 聯結和彙總資料,以產生商務彙總。 這個方法最好是具有程序設計背景(Python 或 PySpark) 背景的人員。
方法 #2 - 使用 Spark SQL 聯結和彙總資料,以產生商務彙總。 此方法最好是具有 SQL 背景、轉換至 Spark 的人員。
方法 #1 (sale_by_date_city) - 使用 PySpark 聯結和彙總資料來產生商務彙總。 使用下列程序代碼,您會建立三個不同的 Spark 資料框架,每個架構都會參考現有的 Delta 資料表。 然後,您可以使用資料框架來聯結這些資料表、執行分組來產生彙總、重新命名幾個資料行,最後將其寫入 Lakehouse 的資料表區段中的 Delta 資料表,以保存資料。
在此儲存格中,您會建立三個不同的 Spark 資料框架,每個框架都會參考現有的 Delta 資料表。
df_fact_sale = spark.read.table("wwilakehouse.fact_sale") df_dimension_date = spark.read.table("wwilakehouse.dimension_date") df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
將下列程式碼新增至相同的儲存格,以使用稍早建立的資料框架來連結這些資料表。 分組依據以產生彙總、重新命名幾個資料行,最後將其寫入 Lakehouse 的資料表區段中做為 Delta 資料表。
sale_by_date_city = df_fact_sale.alias("sale") \ .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \ .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \ .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\ .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\ .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\ .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\ .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\ .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\ .withColumnRenamed("sum(Profit)", "SumOfProfit")\ .orderBy("date.Date", "city.StateProvince", "city.City") sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
方法 #2 (sale_by_date_employee) - 使用 Spark SQL 聯結和彙總資料,以產生商務彙總。 使用下列程式碼,您可以聯結三個資料表來建立暫存 Spark 檢視、執行分組以產生彙總,以及重新命名其中一些資料行。 最後,您會從暫存的 Spark 檢視讀取,最後將其寫入 Lakehouse 之 資料表區段中的 Delta 資料表,以保存資料。
在此儲存格中,您會連結三個資料表來建立暫存 Spark 檢視、執行分組以產生彙總,以及重新命名幾個資料行。
%%sql CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee AS SELECT DD.Date, DD.CalendarMonthLabel , DD.Day, DD.ShortMonth Month, CalendarYear Year ,DE.PreferredName, DE.Employee ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax ,SUM(FS.TaxAmount) SumOfTaxAmount ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax ,SUM(Profit) SumOfProfit FROM wwilakehouse.fact_sale FS INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
在此儲存格中,您會從上一個儲存格中建立的暫存 Spark 檢視讀取,最後將其寫入 Lakehouse 的資料表區段中做為 Delta 資料表。
sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee") sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
若要驗證已建立的資料表,請按滑鼠右鍵並選取 wwilakehouse Lakehouse 上的重新整理。 彙總資料表隨即出現。
這兩種方法會產生類似的結果。 若要將學習新技術或效能危害的需求降到最低,請選擇最符合背景和喜好設定的方法。
您可能會注意到您正在將資料寫入為 Delta Lake 檔案。 Fabric 的自動資料表探索和註冊功能會在中繼存放區中挑選並加以註冊。 您不需要明確呼叫 CREATE TABLE
語句,即可建立要與 SQL 搭配使用的資料表。