次の方法で共有


Apache Spark™ クラスターで AKS 上の Azure HDInsight で Delta Lake を使用する (プレビュー)

大事な

AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 についてもっと学ぶ。

ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。

大事な

この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、AskHDInsight に詳細を記載したリクエストを送信してください。また、Azure HDInsight Communityをフォローして最新の更新情報を入手してください。

AKS Azure HDInsight は、組織が大量のデータを処理するのに役立つビッグ データ分析用のマネージド クラウド ベースのサービスです。 このチュートリアルでは、Apache Spark™ クラスターで AKS 上の Azure HDInsight で Delta Lake を使用する方法について説明します。

前提

  1. AKS 上の Azure HDInsight で Apache Spark™ クラスターを作成する

    Spark クラスターの作成を示すスクリーンショット。

  2. Jupyter Notebook で Delta Lake シナリオを実行します。 Jupyter Notebook を作成し、ノートブックの作成時に [Spark] を選択します。これは、次の例が Scala 内にあるためです。

    デルタ レイク シナリオを実行する方法を示すスクリーンショット。

シナリオ

  • NYC タクシー Parquet データ形式を読み取る - NYC タクシーリムジン委員会 &から提供される Parquet ファイルの URL の一覧。
  • URL (ファイル) ごとに変換を実行し、Delta 形式で格納します。
  • インクリメンタル読み込みを使用して、デルタテーブルからの平均距離、1マイルあたりの平均コスト、平均コストを計算します。
  • ステップ 3 の計算値を Delta 形式で KPI 出力フォルダーに格納します。
  • Delta Format 出力フォルダーにデルタ テーブルを作成する (自動更新)。
  • KPI 出力フォルダーには、平均距離と乗車の 1 マイルあたりの平均コストの複数のバージョンがあります。

デルタ レイクの必須構成を指定する

Delta Lake と Apache Spark の互換性マトリックス - Delta Lake 、Apache Spark バージョンに基づいて Delta Lake のバージョンを変更します。

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

デルタ レイクの構成を示すスクリーンショット。

データ ファイルを一覧表示する

手記

これらのファイルのURLは、NYC タクシーリムジン委員会 &のものです。

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Spark アプリケーションを起動する方法を示すスクリーンショット。

出力ディレクトリを作成する

デルタ形式の出力を作成する場所。必要に応じて、transformDeltaOutputPathavgDeltaOutputKPIPath 変数を変更します。

  • avgDeltaOutputKPIPath - 平均 KPI を差分形式で格納する
  • transformDeltaOutputPath - 変換された出力をデルタ形式で格納する
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

output-directory を作成する方法を示すスクリーンショット。

Parquet 形式からデルタ形式データを作成する

  1. 入力データは listOfDataFileからのもので、https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page からダウンロードされました。

  2. タイム トラベルとバージョンを示すには、データを個別に読み込みます

  3. 増分読み込み時に変換を実行し、次のビジネス KPI を計算します。

    1. 平均距離
    2. 1 マイルあたりの平均コスト
    3. 平均コスト
  4. 変換された KPI データをデルタ形式で保存する

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    差分形式でデータを取得する方法を示すスクリーンショット。

  5. 差分テーブルを使用してデルタ形式を読み取る

    1. 変換されたデータを読み取る
    2. KPI データの読み取り
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    KPI データの読み取りを示すスクリーンショット。

  6. 印刷スキーマ

    1. 変換されたKPIデータと平均KPIデータ用のデルタテーブルスキーマを出力します。
    // transform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    印刷スキーマの出力を示すスクリーンショット。

  7. データ テーブルから最後に計算された KPI を表示する

    dtAvgKpi.toDF.show(false)

    データ テーブルから最後に計算された KPI を示すスクリーンショット。

計算済み KPI 履歴の表示

この手順では、_delta_log からの KPI トランザクション テーブルの履歴が表示されます

dtAvgKpi.history().show(false)

KPI の計算履歴を示すスクリーンショット。

各データ読み込み後に KPI データを表示する

  1. タイム トラベルを使用すると、各読み込み後に KPI の変更を表示できます
  2. すべてのバージョンの変更を CSV 形式で avgMoMKPIChangePath に格納して、Power BI でこれらの変更を読み取ることができるようにすることができます
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

各データ読み込み後の KPI データのスクリーンショット。

参考

  • Apache、Apache Spark、Spark、および関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF) の商標である です。