在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集 (預覽)
重要
AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解。
您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。
重要
這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關資訊,請參閱 AKS 上的 Azure HDInsight 預覽資訊。 如需疑問或功能建議,請在 AskHDInsight 提交請求,並追蹤我們以獲取 Azure HDInsight 社群的更多更新。
AKS 上的 Azure HDInsight 是一項受控雲端式服務,可協助組織處理大量數據。 本教學課程示範如何在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集。
先決條件
在 AKS 上的 Azure HDInsight 中建立 Apache Spark™ 叢集
在 Jupyter Notebook 中執行 Delta Lake 案例。 建立 Jupyter Notebook 並在建立筆記本時選取 「Spark」,因為下列範例位於 Scala 中。
場景
- 讀取 NYC 計程車 Parquet 數據格式 - Parquet 檔案 URL 清單由 NYC 計程車 & 豪華轎車委員會提供。
- 針對每個 URL(檔案)執行一些轉換,並以 Delta 格式儲存。
- 使用增量載入計算 Delta 表中的平均距離、每英里的平均成本以及平均成本。
- 將步驟#3 的計算值以 Delta 格式儲存至 KPI 輸出資料夾。
- 自動在 Delta 格式的輸出資料夾中建立 Delta 資料表(自動重新整理)。
- KPI 輸出資料夾具有多個版本的平均距離,以及車程每英里的平均成本。
提供 Delta Lake 的需求設定
Delta Lake 與 Apache Spark 兼容性矩陣 - Delta Lake,根據 Apache Spark 版本調整 Delta Lake 版本。
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
列出數據檔
注意
這些檔案網址 來自紐約市計程車 & 禮車委員會。
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
建立輸出目錄
您希望建立增量格式輸出的地點,如有需要,可變更 transformDeltaOutputPath
和 avgDeltaOutputKPIPath
變數,
-
avgDeltaOutputKPIPath
- 以差異格式儲存平均 KPI -
transformDeltaOutputPath
- 將轉換後的輸出以差異格式儲存
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
從 Parquet 格式建立 Delta 格式資料
輸入數據來自
listOfDataFile
,數據從 https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page 下載。若要示範時間移動和版本,請個別載入數據
在累加式負載上執行轉換和計算下列商務 KPI:
- 平均距離
- 每英里的平均成本
- 平均成本
以 Delta 格式儲存已轉換和 KPI 資料
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
使用 Delta 表格來讀取 Delta 格式
- 讀取轉換的數據
- 讀取 KPI 數據
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
列印架構
- 列印已轉換和平均 KPI 資料的 Delta 表格結構。
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
顯示數據表中上次計算的 KPI
dtAvgKpi.toDF.show(false)
顯示計算 KPI 歷程記錄
此步驟會顯示來自 _delta_log
KPI 事務數據表的歷程記錄
dtAvgKpi.history().show(false)
在每個數據載入之後顯示 KPI 數據
- 使用時光倒流,您可以在每次載入後檢視 KPI 的變動情況。
- 您可以在
avgMoMKPIChangePath
儲存 CSV 格式的所有版本變更,讓 Power BI 可以讀取這些變更
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
參考
- Apache、Apache Spark、Spark 和相關開放原始碼專案名稱 是 Apache Software Foundation (ASF) 商標。