在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集 (預覽)
AKS 上的 Azure HDInsight 是一項受控雲端式服務,可協助組織處理大量數據。 本教學課程示範如何在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集。
在 AKS 上的 Azure HDInsight 中建立 Apache Spark™ 叢集
在 Jupyter Notebook 中執行 Delta Lake 案例。 建立 Jupyter Notebook 並在建立筆記本時選取 「Spark」,因為下列範例位於 Scala 中。
- 讀取 NYC 計程車 Parquet 數據格式 - Parquet 檔案 URL 清單由 NYC 計程車 & 豪華轎車委員會提供。
- 針對每個 URL(檔案)執行一些轉換,並以 Delta 格式儲存。
- 使用增量載入計算 Delta 表中的平均距離、每英里的平均成本以及平均成本。
- 將步驟#3 的計算值以 Delta 格式儲存至 KPI 輸出資料夾。
- 自動在 Delta 格式的輸出資料夾中建立 Delta 資料表(自動重新整理)。
- KPI 輸出資料夾具有多個版本的平均距離,以及車程每英里的平均成本。
提供 Delta Lake 的需求設定
Delta Lake 與 Apache Spark 兼容性矩陣 - Delta Lake,根據 Apache Spark 版本調整 Delta Lake 版本。
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
這些檔案網址 來自紐約市計程車 & 禮車委員會。
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
您希望建立增量格式輸出的地點,如有需要,可變更 transformDeltaOutputPath
和 avgDeltaOutputKPIPath
- 以差異格式儲存平均 KPI -
- 將轉換後的輸出以差異格式儲存
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
從 Parquet 格式建立 Delta 格式資料
,數據從 https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page 下載。若要示範時間移動和版本,請個別載入數據
在累加式負載上執行轉換和計算下列商務 KPI:
- 平均距離
- 每英里的平均成本
- 平均成本
以 Delta 格式儲存已轉換和 KPI 資料
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
使用 Delta 表格來讀取 Delta 格式
- 讀取轉換的數據
- 讀取 KPI 數據
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
- 列印已轉換和平均 KPI 資料的 Delta 表格結構。
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
顯示數據表中上次計算的 KPI
顯示計算 KPI 歷程記錄
此步驟會顯示來自 _delta_log
KPI 事務數據表的歷程記錄
在每個數據載入之後顯示 KPI 數據
- 使用時光倒流,您可以在每次載入後檢視 KPI 的變動情況。
- 您可以在
儲存 CSV 格式的所有版本變更,讓 Power BI 可以讀取這些變更
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
