Usar o Delta Lake no Azure HDInsight no AKS com o cluster Apache Spark™ (versão prévia)
Observação
Desativaremos o Microsoft Azure HDInsight no AKS em 31 de janeiro de 2025. Para evitar o encerramento abrupto das suas cargas de trabalho, você precisará migrá-las para o Microsoft Fabric ou para um produto equivalente do Azure antes de 31 de janeiro de 2025. Os clusters restantes em sua assinatura serão interrompidos e removidos do host.
Somente o suporte básico estará disponível até a data de desativação.
Importante
Esse recurso está atualmente na visualização. Os Termos de uso complementares para versões prévias do Microsoft Azure incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, confira Informações sobre a versão prévia do Azure HDInsight no AKS. Caso tenha perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para ver mais atualizações sobre a Comunidade do Azure HDInsight.
O Azure HDInsight no AKS é um serviço gerenciado baseado em nuvem para análise de Big Data que ajuda as organizações a processar grandes quantidades de dados. Esse tutorial mostra como usar o Delta Lake no Azure HDInsight no AKS com o cluster Apache Spark™.
Pré-requisito
Execute o cenário Delta Lake no Jupyter Notebook. Crie um Jupyter Notebook e selecione "Spark" durante a criação de um notebook, já que o exemplo a seguir está no Scala.
Cenário
- Leia o formato de dados de Parquet do Táxi de NYC – Lista de URLs de arquivos Parquet são fornecidos em Táxi de NYC e Comissão de Limusine.
- Para cada url (arquivo) realize alguma transformação e armazene no formato Delta.
- Compute a distância média, o custo médio por milha e o custo médio da Tabela do Delta usando a carga incremental.
- Armazene o valor computado da Etapa 3 no formato do Delta na pasta de saída do KPI.
- Crie uma tabela do Delta na pasta de saída do formato Delta (atualização automática).
- A pasta de saída do KPI possui diversas versões da distância média e do custo médio por milha de uma viagem.
Fornecer configurações obrigatórias para o Delta Lake
Matriz de compatibilidade do Delta Lake com o Apache Spark - Delta Lake, altere a versão do Delta Lake com base na versão do Apache Spark.
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
Listar o arquivo de dados
Observação
Esses URLs de arquivo são de Táxi de NYC e Comissão de Limusine.
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
Criar diretório de saída
O local em que você quer criar a saída no formato delta, altere as variáveistransformDeltaOutputPath
e avgDeltaOutputKPIPath
, se necessário:
avgDeltaOutputKPIPath
– para armazenar o KPI médio em formato deltatransformDeltaOutputPath
– armazenar saída transformada em formato delta
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
Criar dados no formato Delta a partir do formato Parquet
Os dados de entrada são de
listOfDataFile
, em que os dados são baixados de https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.pagePara demonstrar a Viagem no tempo e a versão, carregue os dados individualmente
Execute a transformação e calcule o seguinte KPI de negócios em carga incremental:
- A distância média
- O custo médio por milha
- O custo médio
Salvar os dados de KPI e transformados em formato delta
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
Ler o formato delta usando Tabela Delta
- ler dados transformados
- ler dados de KPI
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
Esquema de Impressão
- Imprima o esquema da Tabela Delta para data1 de KPI médios e transformados.
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
Exibir último KPI computado da Tabela de Dados
dtAvgKpi.toDF.show(false)
Exibir histórico de KPI computado
Essa etapa exibe o histórico da tabela de transações de KPI do _delta_log
dtAvgKpi.history().show(false)
Exibir dados de KPI após cada carregamento de dados
- Usando a Viagem no tempo será possível exibir as alterações de KPI após cada carregamento
- Você poderá armazenar todas as alterações de versão no formato CSV em
avgMoMKPIChangePath
para que o Power BI possa ler essas alterações
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
Referência
- Apache, Apache Spark, Spark e nomes de projeto de software livre associados são marcas comerciais da Apache Software Foundation (ASF).