Utiliser Delta Lake dans Azure HDInsight sur AKS avec un cluster Apache Spark™ (préversion)
Important
Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. En savoir plus avec cette annonce.
Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.
Important
Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les informations sur la préversion de Azure HDInsight sur AKS. Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.
Azure HDInsight sur AKS est un service cloud managé pour l’analytique Big Data qui aide les organisations à traiter de grandes quantités de données. Ce tutoriel montre comment utiliser Delta Lake dans Azure HDInsight sur AKS avec un cluster Apache Spark™.
Prérequis
Créer un cluster Apache Spark™ dans Azure HDInsight sur AKS
Exécutez le scénario Delta Lake dans Jupyter Notebook. Créez un bloc-notes Jupyter et sélectionnez « Spark » lors de la création d’un bloc-notes, car l’exemple suivant est dans Scala.
Scénario
- Format de données NyC Taxi Parquet - La liste des URL de fichiers Parquet est fournie à partir de NYC Taxi & Limousine Commission.
- Pour chaque URL (fichier) effectuez une transformation et stockez-la au format Delta.
- Calculez la distance moyenne, le coût moyen par mile et le coût moyen de Delta Table à l’aide d’une charge incrémentielle.
- Stockez la valeur calculée de l’étape 3 au format Delta dans le dossier de sortie KPI.
- Créer une table Delta sur le dossier de destination au format Delta (actualisation automatique).
- Le dossier de sortie de l’indicateur de performance clé comporte plusieurs versions de la distance moyenne et le coût moyen par mile pour un voyage.
Fournir des configurations requises pour le lac delta
Matrice de compatibilité Delta Lake avec Apache Spark : Delta Lake, modifiez la version de Delta Lake en fonction de la version d’Apache Spark.
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
Répertorier le fichier de données
Note
Ces URL de fichier proviennent de NyC Taxi & Limousine Commission.
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
Créer un répertoire de sortie
Emplacement où vous souhaitez créer une sortie de format delta, modifier la transformDeltaOutputPath
et avgDeltaOutputKPIPath
variable si nécessaire,
-
avgDeltaOutputKPIPath
- pour stocker l’indicateur de performance clé moyen au format delta -
transformDeltaOutputPath
- stocker la sortie transformée au format delta
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
Créer des données au format Delta à partir du format Parquet
Les données d’entrée proviennent de
listOfDataFile
, où sont importées les données téléchargées depuis https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page.Pour illustrer le voyage et la version du temps, chargez les données individuellement
Effectuez une transformation et calculez les indicateurs de performance clés métier suivants sur la charge incrémentielle :
- Distance moyenne
- Coût moyen par mile
- Coût moyen
Enregistrer les données transformées et KPI au format delta
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
Lire le format delta à l’aide de Delta Table
- lire les données transformées
- lire les données de KPI
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
Imprimer le schéma
- Imprimer le schéma de table delta pour les données d’indicateur de performance clé transformées et moyennes1.
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
Afficher le dernier indicateur de performance clé calculé à partir de la table de données
dtAvgKpi.toDF.show(false)
Afficher l’historique des indicateurs de performance clés calculés
Cette étape affiche l’historique de la table des transactions KPI de _delta_log
dtAvgKpi.history().show(false)
Afficher les données d’indicateur de performance clé après chaque chargement de données
- À l’aide du voyage dans le temps, vous pouvez afficher les modifications des indicateurs de performance clé après chaque chargement
- Vous pouvez stocker toutes les modifications de version au format CSV au format
avgMoMKPIChangePath
, afin que Power BI puisse lire ces modifications
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
Référence
- Apache, Apache Spark, Spark et les noms de projets open source associés sont marques de commerce du Apache Software Foundation (ASF).