Apache Spark™ クラスターで AKS 上の Azure HDInsight で Delta Lake を使用する (プレビュー)
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 でについてもっと学ぶ。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、AskHDInsight に詳細を記載したリクエストを送信してください。また、Azure HDInsight Communityをフォローして最新の更新情報を入手してください。
AKS Azure HDInsight は、組織が大量のデータを処理するのに役立つビッグ データ分析用のマネージド クラウド ベースのサービスです。 このチュートリアルでは、Apache Spark™ クラスターで AKS 上の Azure HDInsight で Delta Lake を使用する方法について説明します。
前提
AKS 上の Azure HDInsight で Apache Spark™ クラスターを作成する
Jupyter Notebook で Delta Lake シナリオを実行します。 Jupyter Notebook を作成し、ノートブックの作成時に [Spark] を選択します。これは、次の例が Scala 内にあるためです。
シナリオ
- NYC タクシー Parquet データ形式を読み取る - NYC タクシーリムジン委員会 &から提供される Parquet ファイルの URL の一覧。
- URL (ファイル) ごとに変換を実行し、Delta 形式で格納します。
- インクリメンタル読み込みを使用して、デルタテーブルからの平均距離、1マイルあたりの平均コスト、平均コストを計算します。
- ステップ 3 の計算値を Delta 形式で KPI 出力フォルダーに格納します。
- Delta Format 出力フォルダーにデルタ テーブルを作成する (自動更新)。
- KPI 出力フォルダーには、平均距離と乗車の 1 マイルあたりの平均コストの複数のバージョンがあります。
デルタ レイクの必須構成を指定する
Delta Lake と Apache Spark の互換性マトリックス - Delta Lake 、Apache Spark バージョンに基づいて Delta Lake のバージョンを変更します。
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
データ ファイルを一覧表示する
手記
これらのファイルのURLは、NYC タクシーリムジン委員会 &のものです。
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
出力ディレクトリを作成する
デルタ形式の出力を作成する場所。必要に応じて、transformDeltaOutputPath
と avgDeltaOutputKPIPath
変数を変更します。
-
avgDeltaOutputKPIPath
- 平均 KPI を差分形式で格納する -
transformDeltaOutputPath
- 変換された出力をデルタ形式で格納する
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
Parquet 形式からデルタ形式データを作成する
入力データは
listOfDataFile
からのもので、https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page からダウンロードされました。タイム トラベルとバージョンを示すには、データを個別に読み込みます
増分読み込み時に変換を実行し、次のビジネス KPI を計算します。
- 平均距離
- 1 マイルあたりの平均コスト
- 平均コスト
変換された KPI データをデルタ形式で保存する
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
差分テーブルを使用してデルタ形式を読み取る
- 変換されたデータを読み取る
- KPI データの読み取り
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
印刷スキーマ
- 変換されたKPIデータと平均KPIデータ用のデルタテーブルスキーマを出力します。
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
データ テーブルから最後に計算された KPI を表示する
dtAvgKpi.toDF.show(false)
計算済み KPI 履歴の表示
この手順では、_delta_log
からの KPI トランザクション テーブルの履歴が表示されます
dtAvgKpi.history().show(false)
各データ読み込み後に KPI データを表示する
- タイム トラベルを使用すると、各読み込み後に KPI の変更を表示できます
- すべてのバージョンの変更を CSV 形式で
avgMoMKPIChangePath
に格納して、Power BI でこれらの変更を読み取ることができるようにすることができます
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
参考
- Apache、Apache Spark、Spark、および関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF) の商標である です。