Usare Delta Lake in cluster Azure HDInsight su AKS con Apache Spark™ (anteprima)
Nota
Azure HDInsight su AKS verrà ritirato il 31 gennaio 2025. Prima del 31 gennaio 2025, sarà necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare interruzioni improvvise dei carichi di lavoro. I cluster rimanenti nella sottoscrizione verranno arrestati e rimossi dall’host.
Solo il supporto di base sarà disponibile fino alla data di ritiro.
Importante
Questa funzionalità è attualmente disponibile solo in anteprima. Le Condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire Microsoft per altri aggiornamenti nella Community di Azure HDInsight.
Azure HDInsight su AKS è un servizio gestito basato sul cloud per l'analisi di Big Data che consente alle organizzazioni di elaborare grandi quantità di dati. Questa esercitazione illustra come usare Delta Lake in cluster Azure HDInsight su AKS con Apache Spark™.
Prerequisito
Creare un cluster Apache Spark™ in Azure HDInsight su AKS
Eseguire uno scenario Delta Lake in Jupyter Notebook. Creare un Jupyter Notebook e selezionare "Spark" durante la creazione di un notebook, poiché l'esempio seguente si trova in Scala.
Scenario
- Leggere il formato dati Parquet per NYC Taxi: l'elenco degli URL dei file Parquet è disponibile presso la NYC Taxi & Limousine Commission.
- Per ogni URL (file) eseguire alcune trasformazioni e archiviare in formato Delta.
- Calcolare la distanza media, il costo medio per miglio e il costo medio della tabella Delta usando il carico incrementale.
- Archiviare il valore calcolato dal passaggio 3 in formato Delta nella cartella di output dell'indicatore KPI.
- Creare una tabella Delta nella cartella di output del formato Delta (aggiornamento automatico).
- La cartella di output KPI include più versioni della distanza media e il costo medio per miglio per una corsa.
Fornire le configurazioni necessarie per il Delta Lake
Matrice di compatibilità Delta Lake con Apache Spark: Delta Lake, modificare la versione di Delta Lake in base alla versione di Apache Spark.
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
Elencare il file di dati
Nota
Questi URL di file provengono dalla NYC Taxi & Limousine Commission.
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
Creare la directory di output
Percorso in cui si vuole creare l'output del formato differenziale, modificare le variabili transformDeltaOutputPath
e avgDeltaOutputKPIPath
se necessario.
avgDeltaOutputKPIPath
: per archiviare l'indicatore KPI medio in formato differenzialetransformDeltaOutputPath
: archiviare l'output trasformato in formato differenziale
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
Creare dati in formato Delta dal formato Parquet
I dati di input provengono da
listOfDataFile
, dove i dati sono scaricati da https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.pagePer illustrare il tempo di viaggio e la versione, caricare i dati singolarmente
Eseguire la trasformazione e calcolare l'indicatore KPI aziendale seguente al caricamento incrementale:
- Distanza media
- Costo medio per miglio
- Costo medio
Salvare i dati trasformati e KPI in formato differenziale
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
Leggere il formato differenziale usando la tabella Delta
- leggere i dati trasformati
- leggere i dati KPI
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
Schema di stampa
- Schema tabella Delta di stampa per i dati KPI trasformati e medi1.
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
Visualizzare l'ultimo indicatore KPI calcolato dalla tabella dati
dtAvgKpi.toDF.show(false)
Visualizzare la cronologia degli indicatori KPI calcolati
Questo passaggio visualizza la cronologia della tabella delle transazioni KPI da _delta_log
dtAvgKpi.history().show(false)
Visualizzare i dati KPI dopo ogni caricamento dei dati
- Usando lo spostamento cronologico è possibile visualizzare le modifiche KPI dopo ogni caricamento
- È possibile archiviare tutte le modifiche alla versione in formato CSV in
avgMoMKPIChangePath
, in modo che Power BI possa leggere queste modifiche
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
Riferimento
- Apache, Apache Spark, Spark, e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).