다음을 통해 공유


Apache Spark™ 클러스터를 사용하여 AKS의 Azure HDInsight에서 Delta Lake 사용(미리 보기)

중요하다

AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 공지 을 통해에 대해 자세히 알아보세요.

워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.

중요하다

이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 추가 사용 약관에는 베타, 미리 보기 중이거나 아직 일반 공급으로 출시되지 않은 Azure 기능에 적용되는 추가 법적 조건이 포함되어 있습니다. 특정 미리 보기 정보에 대해서는 Azure HDInsight 및 AKS 미리 보기 정보을 참조하세요. 질문이나 기능 제안 사항이 있는 경우, 자세한 내용을 AskHDInsight에 제출해 주시고, 더 많은 업데이트를 원하신다면 Azure HDInsight 커뮤니티를 팔로우해 주세요.

AKS Azure HDInsight 조직이 대량의 데이터를 처리하는 데 도움이 되는 빅 데이터 분석을 위한 관리형 클라우드 기반 서비스입니다. 이 자습서에서는 Apache Spark™ 클러스터를 사용하여 AKS의 Azure HDInsight에서 Delta Lake를 사용하는 방법을 보여 줍니다.

전제 조건

  1. AKS Azure HDInsight에서 Apache Spark™ 클러스터 만들기

    Spark 클러스터 만들기를 보여 주는 스크린샷

  2. Jupyter Notebook에서 Delta Lake 시나리오를 실행합니다. Jupyter Notebook을 만들고, 다음 예제가 Scala에 있으므로 Notebook을 만들 때 "Spark"를 선택합니다.

    Delta Lake 시나리오를 실행하는 방법을 보여 주는 스크린샷

시나리오

  • NYC Taxi Parquet 데이터 형식 읽기 - Parquet 파일 URL 목록은 NYC Taxi & 리무진 위원회제공됩니다.
  • 각 URL(파일)에 대해 일부 변환을 수행하고 델타 형식으로 저장합니다.
  • 증분 부하를 사용하여 Delta Table의 평균 거리, 마일당 평균 비용 및 평균 비용을 계산합니다.
  • 델타 형식의 3단계에서 계산된 값을 KPI 출력 폴더에 저장합니다.
  • 델타 형식 출력 폴더에서 델타 테이블을 만듭니다(자동 새로 고침).
  • KPI 출력 폴더에는 여러 버전의 평균 거리와 여정의 마일당 평균 비용이 있습니다.

Delta Lake에 대한 필수 구성 제공

Apache Spark 호환성 매트릭스를 사용하는 Delta Lake - Delta LakeApache Spark 버전에 따라 Delta Lake 버전을 변경합니다.

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

델타 레이크 구성을 보여 주는 스크린샷

데이터 파일 나열

메모

이 파일 URL은 NYC 택시 리무진 위원회 &에서 왔습니다.

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Spark 애플리케이션을 시작하는 방법을 보여 주는 스크린샷

출력 디렉터리 만들기

델타 형식 출력을 생성할 위치를 지정하고, 필요한 경우 변수 transformDeltaOutputPathavgDeltaOutputKPIPath을 변경합니다.

  • avgDeltaOutputKPIPath - 평균 KPI를 델타 형식으로 저장
  • transformDeltaOutputPath - 변환된 출력을 델타 형식으로 저장
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

출력 디렉터리를 만드는 방법을 보여 주는 스크린샷

Parquet 형식에서 델타 형식 데이터 만들기

  1. 입력 데이터는 listOfDataFile에서 왔으며, https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page에서 다운로드한 데이터입니다.

  2. 시간 이동 및 버전을 보여 주려면 데이터를 개별적으로 로드합니다.

  3. 증분 로드에서 비즈니스 KPI에 따라 변환 및 컴퓨팅을 수행합니다.

    1. 평균 거리
    2. 마일당 평균 비용
    3. 평균 비용
  4. 변환된 데이터 및 KPI 데이터를 델타 형식으로 저장

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    델타 형식으로 데이터를 전송하는 방법을 보여 주는 스크린샷

  5. Delta Table을 사용하여 델타 형식 읽기

    1. 변환된 데이터 읽기
    2. KPI 데이터 읽기
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    읽기 KPI 데이터를 보여 주는 스크린샷

  6. 인쇄 스키마

    1. 변환된 KPI 데이터1 및 평균 KPI에 대한 델타 테이블 스키마를 인쇄합니다.
    // transform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    인쇄 스키마 출력을 보여 주는 스크린샷

  7. 데이터 테이블에서 마지막 계산 KPI 표시

    dtAvgKpi.toDF.show(false)

    데이터 테이블에서 마지막으로 계산된 KPI를 보여 주는 스크린샷

계산된 KPI 기록 표시

이 단계에서는 _delta_log KPI 트랜잭션 테이블의 기록을 표시합니다.

dtAvgKpi.history().show(false)

계산된 KPI 기록을 보여 주는 스크린샷

각 데이터 로드 후 KPI 데이터 표시

  1. 시간 이동 사용 시 각 로드 후 KPI 변경 내용을 볼 수 있습니다.
  2. Power BI에서 이러한 변경 내용을 읽을 수 있도록 모든 버전 변경 내용을 avgMoMKPIChangePath CSV 형식으로 저장할 수 있습니다.
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

각 데이터 로드 후 KPI 데이터를 스크린샷으로 저장합니다.

참조