Compartir a través de


Uso de Delta Lake en Azure HDInsight en AKS con clústeres de Apache Spark™ (versión preliminar)

Nota:

Retiraremos Azure HDInsight en AKS el 31 de enero de 2025. Antes del 31 de enero de 2025, deberá migrar las cargas de trabajo a Microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo. Los clústeres restantes de la suscripción se detendrán y quitarán del host.

Solo el soporte técnico básico estará disponible hasta la fecha de retirada.

Importante

Esta funcionalidad actualmente está en su versión preliminar. En Términos de uso complementarios para las versiones preliminares de Microsoft Azure encontrará más términos legales que se aplican a las características de Azure que están en versión beta, en versión preliminar, o que todavía no se han lanzado con disponibilidad general. Para más información sobre esta versión preliminar específica, consulte la Información de Azure HDInsight sobre la versión preliminar de AKS. Para plantear preguntas o sugerencias sobre la característica, envíe una solicitud en AskHDInsight con los detalles y síganos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.

Azure HDInsight en AKS es un servicio administrado basado en la nube para el análisis de macrodatos que ayuda a las organizaciones a procesar grandes cantidades de datos. En este tutorial se muestra cómo usar Delta Lake en Azure HDInsight en AKS con clústeres de Apache Spark™.

Requisito previo

  1. Creación de un clúster de Apache Spark™ en Azure HDInsight en AKS

    Captura de pantalla que muestra la creación del clúster de Spark.

  2. Ejecute el escenario de Delta Lake en Jupyter Notebook. Cree un cuaderno de Jupyter Notebook y seleccione "Spark" al crear un cuaderno, ya que el siguiente ejemplo está en Scala.

    Captura de pantalla que muestra cómo ejecutar un escenario de Delta Lake.

Escenario

  • Lectura del formato de datos Parquet de NYC Taxi: la lista de direcciones URL de archivos Parquet la proporciona la NYC Taxi & Limousine Commission.
  • Para cada dirección URL (archivo) realice alguna transformación y almacén en formato Delta.
  • Calcule la distancia media, el costo promedio por milla y el costo medio de la tabla de Delta mediante la carga incremental.
  • Almacene el valor calculado del paso 3 en formato Delta en la carpeta de salida de KPI.
  • Cree una tabla de Delta en la carpeta de salida de formato Delta (actualización automática).
  • La carpeta de salida de KPI tiene varias versiones de la distancia media y el costo medio por milla de un viaje.

Proporcionar configuraciones necesarias para Delta Lake

Matriz de compatibilidad de Delta Lake con Apache Spark: Delta Lake, cambie la versión de Delta Lake en función de la versión de Apache Spark.

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

Captura de pantalla que muestra las configuraciones de Delta Lake.

Enumeración del archivo de datos

Nota:

Estas direcciones URL de archivo proceden de la NYC Taxi & Limousine Commission.

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Captura de pantalla que muestra cómo iniciar una aplicación Spark.

Creación de un directorio de salida

La ubicación en la que desea crear la salida del formato Delta, cambie la variable transformDeltaOutputPath y avgDeltaOutputKPIPath si es necesario,

  • avgDeltaOutputKPIPath: para almacenar el KPI medio en formato Delta
  • transformDeltaOutputPath: almacenar la salida transformada en formato Delta
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

Captura de pantalla que muestra cómo crear el directorio de salida.

Crear datos de formato Delta a partir del formato Parquet

  1. Los datos de entrada proceden de listOfDataFile, mientras que los datos descargados proceden de https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

  2. Para mostrar el viaje en el tiempo y la versión, cargue los datos individualmente

  3. Realice la transformación y calcule los siguientes KPI empresariales en la carga incremental:

    1. La distancia media
    2. El costo promedio por milla
    3. El costo promedio
  4. Guarde los datos transformados y de KPI en formato Delta

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    Captura de pantalla que muestra cómo gestionar datos en formato Delta.

  5. Lectura del formato Delta mediante la tabla de Delta

    1. lectura de datos transformados
    2. lectura de datos de KPI
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    Captura de pantalla que muestra la lectura de datos KPI.

  6. Esquema de impresión

    1. Imprima el esquema de tabla de Delta para datos de KPI transformados y de promedio.
    // tranform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    Captura de pantalla que muestra la salida del esquema de impresión.

  7. Muestre el último KPI calculado de la tabla de datos

    dtAvgKpi.toDF.show(false)

    Captura de pantalla que muestra el último KPI calculado de la tabla de datos.

Muestre el historial de KPI calculado

En este paso se muestra el historial de la tabla de transacciones de KPI de _delta_log

dtAvgKpi.history().show(false)

Captura de pantalla que muestra el historial de KPI calculado.

Muestre los datos de KPI después de cada carga de datos

  1. Con el viaje en el tiempo, puede ver los cambios de KPI después de cada carga
  2. Puede almacenar todos los cambios de versión en formato CSV en avgMoMKPIChangePath, para que Power BI pueda leerlos
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

Captura de pantalla de los datos de KPI después de cada carga de datos.

Referencia