Verwenden von Delta Lake in Azure HDInsight auf AKS mit Apache Spark™-Cluster (Vorschau)
Hinweis
Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.
Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.
Wichtig
Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.
Azure HDInsight on AKS ist ein verwalteter cloudbasierter Dienst für Big Data-Analysen, der Organisationen beim Verarbeiten großer Datenmengen unterstützt. In diesem Tutorial wird gezeigt, wie Sie Delta Lake in Azure HDInsight auf AKS mit Apache Spark™-Cluster verwenden.
Voraussetzung
Erstellen eines Apache Spark™-Clusters in Azure HDInsight auf AKS
Führen Sie das Delta Lake-Szenario im Jupyter Notebook aus. Erstellen Sie ein Jupyter Notebook, und wählen Sie beim Erstellen eines Notebooks „Spark“ aus, da sich das folgende Beispiel in Scala befindet.
Szenario
- Lesen des NYC Taxi-Parquet-Datenformats – die Liste der URLs für Parquet-Dateien wird von der NYC Taxi & Limousine Commission bereitgestellt.
- Führen Sie für jede URL (Datei) eine Transformation durch, und speichern Sie sie im Delta-Format.
- Berechnen Sie die durchschnittliche Entfernung, die durchschnittlichen Kosten pro Meile und die durchschnittlichen Kosten aus der Delta-Tabelle mithilfe des inkrementellen Ladevorgangs.
- Speichern Sie den berechneten Wert aus Schritt#3 im Delta-Format im KPI-Ausgabeordner.
- Erstellen Sie eine Delta-Tabelle im Ausgabeordner für das Delta-Format (automatische Aktualisierung).
- Der KPI-Ausgabeordner verfügt über mehrere Versionen der durchschnittlichen Entfernung und der durchschnittlichen Kosten pro Meile für eine Fahrt.
Bereitstellen erforderlicher Konfigurationen für Delta Lake
Delta Lake mit Apache Spark-Kompatibilitätsmatrix – Delta Lake, ändern Sie die Delta Lake-Version basierend auf der Apache Spark-Version.
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
Auflisten der Datendatei
Hinweis
Diese Datei-URLs stammen von der NYC Taxi & Limousine Commission.
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
Erstellen eines Ausgabeverzeichnis
Der Speicherort, an dem Sie die Ausgabe des Delta-Formats erstellen möchten; ändern Sie die Variable transformDeltaOutputPath
und avgDeltaOutputKPIPath
bei Bedarf,
avgDeltaOutputKPIPath
– zum Speichern des durchschnittlichen KPI im Delta-FormattransformDeltaOutputPath
– zum Speichern der transformierten Ausgabe im Delta-Format
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
Erstellen von Daten im Delta-Format aus dem Parquet-Format
Eingabedaten stammen aus
listOfDataFile
, in dem Daten aus https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page heruntergeladen wurdenZum Veranschaulichen der Zeitreise und der Version laden Sie die Daten einzeln
Durchführen der Transformation und Berechnen des folgenden Geschäfts-KPIs bei inkrementellen Ladevorgängen:
- Durchschnittliche Entfernung
- Durchschnittliche Kosten pro Meile
- Durchschnittliche Kosten
Speichern von transformierten und KPI-Daten im Delta-Format
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
Lesen des Delta-Formats mithilfe von Delta-Tabellen
- Lesen von transformierten Daten
- Lesen von KPI-Daten
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
Druckschema
- Delta-Tabelle-Druckschema für transformierte und durchschnittliche KPI-Daten1.
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
Anzeigen des zuletzt berechnetes KPIs aus Datentabelle
dtAvgKpi.toDF.show(false)
Anzeigen des berechneten KPI-Verlaufs
In diesem Schritt wird der Verlauf der KPI-Transaktionstabelle aus _delta_log
angezeigt
dtAvgKpi.history().show(false)
Anzeigen von KPI-Daten nach jedem Ladevorgang der Daten
- Mithilfe von Zeitreisen können Sie KPI-Änderungen nach jedem Ladevorgang anzeigen
- Sie können alle Versionsänderungen im CSV-Format unter
avgMoMKPIChangePath
speichern, damit Power BI diese Änderungen lesen kann
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
Verweis
- Apache, Apache Spark, Spark und zugehörige Open Source-Projektnamen sind Handelsmarken der Apache Software Foundation (ASF).