Использование Delta Lake в Azure HDInsight в AKS с кластером Apache Spark™ (предварительная версия)
Важный
Azure HDInsight на AKS выведен из эксплуатации 31 января 2025 г. Узнайте больше об этом объявлении .
Необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого завершения рабочих нагрузок.
Важный
Эта функция сейчас доступна в предварительной версии. Дополнительные условия использования для предварительных версий Microsoft Azure включают дополнительные юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос с подробными сведениями на AskHDInsight и подписывайтесь на обновления в Azure HDInsight Community.
Azure HDInsight в AKS — это управляемая облачная служба для аналитики больших данных, которая помогает организациям обрабатывать большие объемы данных. В этом руководстве показано, как использовать Delta Lake в Azure HDInsight в AKS с кластером Apache Spark™.
Предпосылка
Создание кластера Apache Spark™ в Azure HDInsight в AKS
Запустите сценарий Delta Lake в Jupyter Notebook. Создайте записную книжку Jupyter и выберите "Spark" при создании записной книжки, так как в следующем примере используется Scala.
Сценарий
- Чтение формата данных NYC Taxi Parquet — список URL-адресов файлов Parquet предоставляются из такси Нью-Йорка & Лимузин комиссии.
- Для каждого URL-адреса (файла) выполняется некоторое преобразование и хранение в разностном формате.
- Вычислить среднее расстояние, среднюю стоимость за милю и среднюю стоимость из таблицы Delta с помощью инкрементальной загрузки.
- Сохранить вычисленное значение из шага 3 в формате Delta в папку вывода ключевого показателя эффективности (KPI).
- Создайте Delta Table в формате Delta в папке выходных данных (автообновление).
- В выходной папке KPI содержится несколько версий среднего расстояния и средней стоимости за милю для поездки.
Предоставление требуемых конфигураций для Delta Lake
Delta Lake и матрица совместимости с Apache Spark — Delta Lake, измените версию Delta Lake в зависимости от версии Apache Spark.
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
Список файла данных
Заметка
Эти URL-адреса файлов относятся к Комиссии по такси и лимузинам Нью-Йорка &.
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
Создание выходного каталога
При необходимости измените переменные transformDeltaOutputPath
и avgDeltaOutputKPIPath
, затем укажите расположение, где вы хотите создать выходные данные разностного формата.
-
avgDeltaOutputKPIPath
— хранение среднего ключевого показателя эффективности в разностном формате -
transformDeltaOutputPath
— хранение преобразованных выходных данных в разностном формате
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
Создание данных в формате Delta из формата Parquet
Входные данные из
listOfDataFile
, где данные загруженные из https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.pageЧтобы продемонстрировать путешествие по времени и версию, загрузите данные по отдельности
Выполните преобразование и вычисление следующего бизнес-ключевого показателя эффективности при добавочной нагрузке:
- Среднее расстояние
- Средняя стоимость за милю
- Средняя стоимость
Сохранение данных, содержащих преобразованные показатели и ключевые показатели эффективности, в дельта-формате.
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
Чтение разностного формата с помощью разностной таблицы
- прочитать преобразованные данные
- чтение данных ключевого показателя эффективности
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
Схема печати
- Печать схемы таблицы Delta для преобразованных и усреднённых данных ключевых показателей эффективности1.
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
Отображение последнего вычисляемого ключевого показателя эффективности из таблицы данных
dtAvgKpi.toDF.show(false)
Отображение журнала вычисляемых ключевых показателей эффективности
На этом шаге отображается история таблицы транзакций ключевого показателя эффективности из _delta_log
dtAvgKpi.history().show(false)
Отображение данных ключевого показателя эффективности после каждой загрузки данных
- С помощью перемещения по времени можно просмотреть изменения ключевого показателя эффективности после каждой загрузки
- Вы можете сохранить все изменения версий в формате CSV в
avgMoMKPIChangePath
, чтобы Power BI могли считывать эти изменения.
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
Ссылка
- Имена проектов Apache, Apache Spark, Spark и связанных с открытым исходным кодом являются товарными знакамиApache Software Foundation (ASF).