Partage via


Utiliser Delta Lake dans Azure HDInsight sur AKS avec un cluster Apache Spark™ (préversion)

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. En savoir plus avec cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les informations sur la préversion de Azure HDInsight sur AKS. Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

Azure HDInsight sur AKS est un service cloud managé pour l’analytique Big Data qui aide les organisations à traiter de grandes quantités de données. Ce tutoriel montre comment utiliser Delta Lake dans Azure HDInsight sur AKS avec un cluster Apache Spark™.

Prérequis

  1. Créer un cluster Apache Spark™ dans Azure HDInsight sur AKS

    Capture d’écran montrant la création d’un cluster Spark.

  2. Exécutez le scénario Delta Lake dans Jupyter Notebook. Créez un bloc-notes Jupyter et sélectionnez « Spark » lors de la création d’un bloc-notes, car l’exemple suivant est dans Scala.

    Capture d’écran montrant comment exécuter le scénario delta lake.

Scénario

  • Format de données NyC Taxi Parquet - La liste des URL de fichiers Parquet est fournie à partir de NYC Taxi & Limousine Commission.
  • Pour chaque URL (fichier) effectuez une transformation et stockez-la au format Delta.
  • Calculez la distance moyenne, le coût moyen par mile et le coût moyen de Delta Table à l’aide d’une charge incrémentielle.
  • Stockez la valeur calculée de l’étape 3 au format Delta dans le dossier de sortie KPI.
  • Créer une table Delta sur le dossier de destination au format Delta (actualisation automatique).
  • Le dossier de sortie de l’indicateur de performance clé comporte plusieurs versions de la distance moyenne et le coût moyen par mile pour un voyage.

Fournir des configurations requises pour le lac delta

Matrice de compatibilité Delta Lake avec Apache Spark : Delta Lake, modifiez la version de Delta Lake en fonction de la version d’Apache Spark.

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

capture d’écran montrant les configurations delta lake.

Répertorier le fichier de données

Note

Ces URL de fichier proviennent de NyC Taxi & Limousine Commission.

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Capture d’écran montrant comment démarrer l’application Spark.

Créer un répertoire de sortie

Emplacement où vous souhaitez créer une sortie de format delta, modifier la transformDeltaOutputPath et avgDeltaOutputKPIPath variable si nécessaire,

  • avgDeltaOutputKPIPath - pour stocker l’indicateur de performance clé moyen au format delta
  • transformDeltaOutputPath - stocker la sortie transformée au format delta
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

Capture d’écran montrant comment créer un répertoire de sortie.

Créer des données au format Delta à partir du format Parquet

  1. Les données d’entrée proviennent de listOfDataFile, où sont importées les données téléchargées depuis https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page.

  2. Pour illustrer le voyage et la version du temps, chargez les données individuellement

  3. Effectuez une transformation et calculez les indicateurs de performance clés métier suivants sur la charge incrémentielle :

    1. Distance moyenne
    2. Coût moyen par mile
    3. Coût moyen
  4. Enregistrer les données transformées et KPI au format delta

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    Capture d’écran montrant comment utiliser des données au format delta.

  5. Lire le format delta à l’aide de Delta Table

    1. lire les données transformées
    2. lire les données de KPI
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    Capture d’écran montrant les données de l’indicateur de performance clé lues.

  6. Imprimer le schéma

    1. Imprimer le schéma de table delta pour les données d’indicateur de performance clé transformées et moyennes1.
    // transform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    Capture d’écran montrant la sortie du schéma d’impression.

  7. Afficher le dernier indicateur de performance clé calculé à partir de la table de données

    dtAvgKpi.toDF.show(false)

    Capture d’écran montrant le dernier indicateur de performance clé calculé à partir de la table de données.

Afficher l’historique des indicateurs de performance clés calculés

Cette étape affiche l’historique de la table des transactions KPI de _delta_log

dtAvgKpi.history().show(false)

Capture d’écran montrant l’historique des indicateurs de performance clé calculés.

Afficher les données d’indicateur de performance clé après chaque chargement de données

  1. À l’aide du voyage dans le temps, vous pouvez afficher les modifications des indicateurs de performance clé après chaque chargement
  2. Vous pouvez stocker toutes les modifications de version au format CSV au format avgMoMKPIChangePath, afin que Power BI puisse lire ces modifications
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

Capture d’écran des données kPI après chaque chargement de données.

Référence