Compartilhar via


Usar o Delta Lake no Azure HDInsight no AKS com o cluster Apache Spark™ (versão prévia)

Importante

O Azure HDInsight no AKS se aposentou em 31 de janeiro de 2025. Saiba mais com este comunicado.

Você precisa migrar suas cargas de trabalho para microsoft fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.

Importante

Esse recurso está atualmente em versão prévia. Os termos de uso complementares para o Microsoft Azure Previews incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, consulte Azure HDInsight nas informações de visualização do AKS. Para perguntas ou sugestões de funcionalidades, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para mais atualizações sobre a Comunidade Azure HDInsight .

Azure HDInsight no AKS é um serviço baseado em nuvem gerenciado para análise de Big Data que ajuda as organizações a processar dados de grandes quantidades. Este tutorial mostra como usar o Delta Lake no Azure HDInsight no AKS com o cluster Apache Spark™.

Pré-requisito

  1. Criar um cluster do Apache Spark™ no serviço Azure HDInsight no serviço AKS

    Captura de tela mostrando a criação do cluster spark.

  2. Execute o cenário do Delta Lake no Jupyter Notebook. Crie um jupyter notebook e selecione "Spark" durante a criação de um notebook, já que o exemplo a seguir está no Scala.

    Captura de tela mostrando como executar o cenário delta lake.

Cenário

  • Leia o formato de dados de táxi de NYC - Lista de URLs de arquivos parquet são fornecidas do NYC Taxi & Limão Commission.
  • Para cada URL (arquivo), execute alguma transformação e armazene no formato Delta.
  • Compute a distância média, o custo médio por milha e o custo médio da Tabela Delta usando a carga incremental.
  • Armazene o valor computado da Etapa 3 no formato Delta na pasta de saída do KPI.
  • Criar Delta Table na pasta de saída em formato Delta (atualização automática).
  • A pasta de resultados de KPI tem várias versões da distância média e do custo médio por milha para uma viagem.

Forneça as configurações necessárias para o Delta Lake

Delta Lake com matriz de compatibilidade do Apache Spark – Delta Lake, altere a versão do Delta Lake com base na versão do Apache Spark.

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

Captura de tela mostrando as configurações do delta lake.

Listar o arquivo de dados

Nota

Essas URLs de arquivo são da Comissão de Táxi e Limusines de NYC &.

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Captura de tela mostrando como iniciar o aplicativo Spark.

Criar diretório de saída

O local onde você deseja criar a saída de formato delta, altere as variáveis transformDeltaOutputPath e avgDeltaOutputKPIPath, se necessário.

  • avgDeltaOutputKPIPath – para armazenar KPI médio no formato delta
  • transformDeltaOutputPath – armazenar a saída transformada no formato delta
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

Captura de tela mostrando como criar diretório de saída.

Criar dados no formato Delta a partir do formato Parquet

  1. Os dados de entrada são de listOfDataFile, onde os dados foram baixados de https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page.

  2. Para demonstrar a linha do tempo e a versão, carregue os dados individualmente.

  3. Realize a transformação e o cálculo dos seguintes KPIs de negócios para a carga incremental.

    1. A distância média
    2. O custo médio por milha
    3. O custo médio
  4. Salvar dados transformados e de KPI no formato delta

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    Captura de tela mostrando como fazer dados no formato delta.

  5. Ler o formato delta usando a Tabela Delta

    1. ler dados transformados
    2. ler dados de KPI
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    Captura de tela mostrando dados de KPI lidos.

  6. Esquema de impressão

    1. Imprimir esquema de tabela delta para dados de KPI transformados e médios1.
    // transform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    Captura de tela mostrando a saída do esquema de impressão.

  7. Exibir o último KPI computado da tabela de dados

    dtAvgKpi.toDF.show(false)

    Captura de tela mostrando o último KPI computado da tabela de dados.

Exibir histórico de KPI computado

Esta etapa exibe o histórico da tabela de transações KPI de _delta_log

dtAvgKpi.history().show(false)

Captura de tela mostrando o histórico de KPI computado.

Exibir dados de KPI após cada carregamento de dados

  1. Usando a viagem no tempo, você pode exibir alterações de KPI após cada carga
  2. Você pode armazenar todas as alterações de versão no formato CSV em avgMoMKPIChangePath, para que o Power BI possa ler essas alterações
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

Tire uma captura de tela dos dados KPI após cada carregamento de dados.

Referência