Freigeben über


Verwenden von Delta Lake in Azure HDInsight auf AKS mit Apache Spark™-Cluster (Vorschau)

Hinweis

Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.

Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

Azure HDInsight on AKS ist ein verwalteter cloudbasierter Dienst für Big Data-Analysen, der Organisationen beim Verarbeiten großer Datenmengen unterstützt. In diesem Tutorial wird gezeigt, wie Sie Delta Lake in Azure HDInsight auf AKS mit Apache Spark™-Cluster verwenden.

Voraussetzung

  1. Erstellen eines Apache Spark™-Clusters in Azure HDInsight auf AKS

    Screenshot: Erstellung eines Spark-Clusters.

  2. Führen Sie das Delta Lake-Szenario im Jupyter Notebook aus. Erstellen Sie ein Jupyter Notebook, und wählen Sie beim Erstellen eines Notebooks „Spark“ aus, da sich das folgende Beispiel in Scala befindet.

    Screenshot: Ausführens des Delta Lake-Szenarios.

Szenario

  • Lesen des NYC Taxi-Parquet-Datenformats – die Liste der URLs für Parquet-Dateien wird von der NYC Taxi & Limousine Commission bereitgestellt.
  • Führen Sie für jede URL (Datei) eine Transformation durch, und speichern Sie sie im Delta-Format.
  • Berechnen Sie die durchschnittliche Entfernung, die durchschnittlichen Kosten pro Meile und die durchschnittlichen Kosten aus der Delta-Tabelle mithilfe des inkrementellen Ladevorgangs.
  • Speichern Sie den berechneten Wert aus Schritt#3 im Delta-Format im KPI-Ausgabeordner.
  • Erstellen Sie eine Delta-Tabelle im Ausgabeordner für das Delta-Format (automatische Aktualisierung).
  • Der KPI-Ausgabeordner verfügt über mehrere Versionen der durchschnittlichen Entfernung und der durchschnittlichen Kosten pro Meile für eine Fahrt.

Bereitstellen erforderlicher Konfigurationen für Delta Lake

Delta Lake mit Apache Spark-Kompatibilitätsmatrix – Delta Lake, ändern Sie die Delta Lake-Version basierend auf der Apache Spark-Version.

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

Screenshot: Delta Lake-Konfigurationen.

Auflisten der Datendatei

Hinweis

Diese Datei-URLs stammen von der NYC Taxi & Limousine Commission.

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Screenshot: Starten der Spark-Anwendung.

Erstellen eines Ausgabeverzeichnis

Der Speicherort, an dem Sie die Ausgabe des Delta-Formats erstellen möchten; ändern Sie die Variable transformDeltaOutputPath und avgDeltaOutputKPIPath bei Bedarf,

  • avgDeltaOutputKPIPath – zum Speichern des durchschnittlichen KPI im Delta-Format
  • transformDeltaOutputPath – zum Speichern der transformierten Ausgabe im Delta-Format
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

Screenshot: Erstellen des Ausgabeverzeichnisses.

Erstellen von Daten im Delta-Format aus dem Parquet-Format

  1. Eingabedaten stammen aus listOfDataFile, in dem Daten aus https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page heruntergeladen wurden

  2. Zum Veranschaulichen der Zeitreise und der Version laden Sie die Daten einzeln

  3. Durchführen der Transformation und Berechnen des folgenden Geschäfts-KPIs bei inkrementellen Ladevorgängen:

    1. Durchschnittliche Entfernung
    2. Durchschnittliche Kosten pro Meile
    3. Durchschnittliche Kosten
  4. Speichern von transformierten und KPI-Daten im Delta-Format

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    Screenshot: Speichern von Daten im Deltaformat.

  5. Lesen des Delta-Formats mithilfe von Delta-Tabellen

    1. Lesen von transformierten Daten
    2. Lesen von KPI-Daten
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    Screenshot: Lesen von KPI-Daten.

  6. Druckschema

    1. Delta-Tabelle-Druckschema für transformierte und durchschnittliche KPI-Daten1.
    // transform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    Screenshot: Druckschemaausgabe.

  7. Anzeigen des zuletzt berechnetes KPIs aus Datentabelle

    dtAvgKpi.toDF.show(false)

    Screenshot: Anzeigen des zuletzt berechneten KPI aus der Datentabelle.

Anzeigen des berechneten KPI-Verlaufs

In diesem Schritt wird der Verlauf der KPI-Transaktionstabelle aus _delta_log angezeigt

dtAvgKpi.history().show(false)

Screenshot: Anzeigen des berechneten KPI-Verlaufs.

Anzeigen von KPI-Daten nach jedem Ladevorgang der Daten

  1. Mithilfe von Zeitreisen können Sie KPI-Änderungen nach jedem Ladevorgang anzeigen
  2. Sie können alle Versionsänderungen im CSV-Format unter avgMoMKPIChangePath speichern, damit Power BI diese Änderungen lesen kann
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

Screenshot: KPI-Daten nach jedem Ladevorgang der Daten.

Verweis