Udostępnij za pośrednictwem


Używanie usługi Delta Lake w usłudze Azure HDInsight w usłudze AKS z klastrem Apache Spark™ (wersja zapoznawcza)

Uwaga

Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.

Ważne

Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

Usługa Azure HDInsight w usłudze AKS to zarządzana usługa oparta na chmurze na potrzeby analizy danych big data, która pomaga organizacjom przetwarzać duże ilości danych. W tym samouczku pokazano, jak używać usługi Delta Lake w usłudze Azure HDInsight w usłudze AKS z klastrem Apache Spark™.

Warunek wstępny

  1. Tworzenie klastra Apache Spark™ w usłudze Azure HDInsight w usłudze AKS

    Zrzut ekranu przedstawiający tworzenie klastra spark.

  2. Uruchamianie scenariusza usługi Delta Lake w notesie Jupyter Notebook. Utwórz notes Jupyter i wybierz pozycję "Spark" podczas tworzenia notesu, ponieważ w języku Scala znajduje się poniższy przykład.

    Zrzut ekranu przedstawiający sposób uruchamiania scenariusza usługi Delta Lake.

Scenariusz

  • Przeczytaj format danych NYC Taxi Parquet — lista adresów URL plików Parquet jest dostarczana przez Komisję Nowojorskiej Taksówki i Limuzyny.
  • Dla każdego adresu URL (pliku) wykonaj pewne przekształcenia i zapisz je w formacie delta.
  • Oblicz średnią odległość, średni koszt na milę i średni koszt z tabeli delty przy użyciu obciążenia przyrostowego.
  • Zapisz obliczoną wartość z kroku 3 w formacie delty do folderu wyjściowego kluczowego wskaźnika wydajności.
  • Utwórz tabelę delty w folderze wyjściowym formatu delty (automatyczne odświeżanie).
  • Folder wyjściowy kluczowego wskaźnika wydajności ma wiele wersji średniej odległości i średni koszt na milę dla podróży.

Podaj wymagania konfiguracji dla usługi delta lake

Usługa Delta Lake z macierzą zgodności platformy Apache Spark — usługa Delta Lake, zmiana wersji usługi Delta Lake na podstawie wersji platformy Apache Spark.

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

Zrzut ekranu przedstawiający konfiguracje usługi Delta Lake.

Wyświetlanie listy plików danych

Uwaga

Te adresy URL plików pochodzą z Komisji Nowojorskiej Taksówki i Limuzyny.

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Zrzut ekranu przedstawiający sposób uruchamiania aplikacji spark.

Tworzenie katalogu wyjściowego

Lokalizacja, w której chcesz utworzyć dane wyjściowe formatu różnicowego, w razie potrzeby zmień zmienną transformDeltaOutputPath i avgDeltaOutputKPIPath .

  • avgDeltaOutputKPIPath — do przechowywania średniego wskaźnika KPI w formacie różnicowym
  • transformDeltaOutputPath — przechowywanie przekształconych danych wyjściowych w formacie różnicowym
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

Zrzut ekranu przedstawiający sposób tworzenia katalogu wyjściowego.

Tworzenie danych formatu różnicowego na podstawie formatu Parquet

  1. Dane wejściowe pochodzą z listOfDataFilelokalizacji , z której pobierane są dane https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

  2. Aby zademonstrować podróż i wersję czasu, załaduj dane indywidualnie

  3. Wykonaj transformację i oblicz następujące wskaźniki KPI biznesowe na przyrostowym obciążeniu:

    1. Średnia odległość
    2. Średni koszt na milę
    3. Średni koszt
  4. Zapisywanie przekształconych i kluczowych wskaźników wydajności danych w formacie różnicowym

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    Zrzut ekranu przedstawiający sposób przetwarzania danych w formacie różnicowym.

  5. Odczytywanie formatu różnicowego przy użyciu tabeli delty

    1. odczytywanie przekształconych danych
    2. odczytywanie danych kluczowych wskaźników wydajności
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    Zrzut ekranu przedstawiający odczytywanie danych kluczowych wskaźników wydajności.

  6. Drukuj schemat

    1. Drukuj schemat tabeli delty dla przekształconych i średnich danych kluczowych wskaźników wydajności1.
    // tranform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    Zrzut ekranu przedstawiający dane wyjściowe schematu wydruku.

  7. Wyświetlanie ostatniego obliczonego wskaźnika KPI z tabeli danych

    dtAvgKpi.toDF.show(false)

    Zrzut ekranu przedstawiający ostatni obliczony wskaźnik KPI z tabeli danych.

Wyświetlanie obliczonej historii kluczowego wskaźnika wydajności

W tym kroku jest wyświetlana historia tabeli transakcji kluczowego wskaźnika wydajności z _delta_log

dtAvgKpi.history().show(false)

Zrzut ekranu przedstawiający obliczoną historię kluczowego wskaźnika wydajności.

Wyświetlanie danych kluczowych wskaźników wydajności po każdym załadowaniu danych

  1. Korzystając z podróży w czasie, można wyświetlić zmiany wskaźników KPI po każdym obciążeniu
  2. Wszystkie zmiany wersji można przechowywać w formacie CSV na avgMoMKPIChangePath stronie , aby usługa Power BI mogła odczytywać te zmiany
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

Zrzut ekranu przedstawiający dane kluczowego wskaźnika wydajności po każdym załadowaniu danych.

Odwołanie