共用方式為


在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集 (預覽)

重要

AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解

您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。

重要

這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關資訊,請參閱 AKS 上的 Azure HDInsight 預覽資訊。 如需疑問或功能建議,請在 AskHDInsight 提交請求,並追蹤我們以獲取 Azure HDInsight 社群的更多更新。

AKS 上的 Azure HDInsight 是一項受控雲端式服務,可協助組織處理大量數據。 本教學課程示範如何在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集。

先決條件

  1. 在 AKS 上的 Azure HDInsight 中建立 Apache Spark™ 叢集

    顯示 Spark 叢集建立過程的螢幕截圖。

  2. 在 Jupyter Notebook 中執行 Delta Lake 案例。 建立 Jupyter Notebook 並在建立筆記本時選取 「Spark」,因為下列範例位於 Scala 中。

    顯示如何執行 Delta Lake 案例的螢幕快照。

場景

  • 讀取 NYC 計程車 Parquet 數據格式 - Parquet 檔案 URL 清單由 NYC 計程車 & 豪華轎車委員會提供。
  • 針對每個 URL(檔案)執行一些轉換,並以 Delta 格式儲存。
  • 使用增量載入計算 Delta 表中的平均距離、每英里的平均成本以及平均成本。
  • 將步驟#3 的計算值以 Delta 格式儲存至 KPI 輸出資料夾。
  • 自動在 Delta 格式的輸出資料夾中建立 Delta 資料表(自動重新整理)。
  • KPI 輸出資料夾具有多個版本的平均距離,以及車程每英里的平均成本。

提供 Delta Lake 的需求設定

Delta Lake 與 Apache Spark 兼容性矩陣 - Delta Lake,根據 Apache Spark 版本調整 Delta Lake 版本。

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

顯示 Delta Lake 組態的螢幕快照。

列出數據檔

注意

這些檔案網址 來自紐約市計程車 & 禮車委員會

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

顯示如何啟動 Spark 應用程式的螢幕快照。

建立輸出目錄

您希望建立增量格式輸出的地點,如有需要,可變更 transformDeltaOutputPathavgDeltaOutputKPIPath 變數,

  • avgDeltaOutputKPIPath - 以差異格式儲存平均 KPI
  • transformDeltaOutputPath - 將轉換後的輸出以差異格式儲存
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

顯示如何建立輸出目錄的螢幕快照。

從 Parquet 格式建立 Delta 格式資料

  1. 輸入數據來自 listOfDataFile,數據從 https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page 下載。

  2. 若要示範時間移動和版本,請個別載入數據

  3. 在累加式負載上執行轉換和計算下列商務 KPI:

    1. 平均距離
    2. 每英里的平均成本
    3. 平均成本
  4. 以 Delta 格式儲存已轉換和 KPI 資料

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    顯示如何以差異格式查看數據的螢幕快照。

  5. 使用 Delta 表格來讀取 Delta 格式

    1. 讀取轉換的數據
    2. 讀取 KPI 數據
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    顯示讀取 KPI 資料的螢幕快照。

  6. 列印架構

    1. 列印已轉換和平均 KPI 資料的 Delta 表格結構。
    // transform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    顯示列印架構輸出的螢幕快照。

  7. 顯示數據表中上次計算的 KPI

    dtAvgKpi.toDF.show(false)

    顯示數據表中上次計算 KPI 的螢幕快照。

顯示計算 KPI 歷程記錄

此步驟會顯示來自 _delta_log KPI 事務數據表的歷程記錄

dtAvgKpi.history().show(false)

顯示計算 KPI 歷程記錄的螢幕快照。

在每個數據載入之後顯示 KPI 數據

  1. 使用時光倒流,您可以在每次載入後檢視 KPI 的變動情況。
  2. 您可以在 avgMoMKPIChangePath 儲存 CSV 格式的所有版本變更,讓 Power BI 可以讀取這些變更
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

每個數據載入之後的 KPI 資料螢幕快照。

參考