教學課程:Azure Data Lake Storage、Azure Databricks 和 Spark
本教學課程說明如何將 Azure Databricks 叢集連線到已啟用 Azure Data Lake Storage 的 Azure 記憶體帳戶中所儲存的數據。 此連線可讓您以原生方式從叢集對資料執行查詢和分析。
在此教學課程中,您需要:
- 將非結構化的資料內嵌到儲存體帳戶
- 在 Blob 儲存體中對資料執行分析
如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
必要條件
建立具有階層命名空間的儲存體帳戶 (Azure Data Lake Storage)
確定您已對使用者帳戶指派儲存體 Blob 資料參與者角色。
安裝 AzCopy v10。 請參閱使用 AzCopy v10 轉送資料
建立服務主體、建立用戶端密碼,然後將儲存體帳戶的存取權授與服務主體。
請參閱 教學課程:連線到 Azure Data Lake Storage (步驟 1 到 3)。 完成這些步驟之後,請務必將租用戶識別碼、應用程式識別碼和用戶端密碼貼入文字檔中。 您稍後會在本教學課程中用到。
建立 Azure Databricks 工作區、叢集和筆記本
建立 Azure Databricks 工作區。 請參閱建立 Azure Databricks 工作區。
建立叢集。 請參閱建立叢集。
建立筆記本。 請參閱建立筆記本。 選擇 Python 做為筆記本的預設語言。
讓您的筆記本保持開啟狀態。 您會在下列幾節中使用此筆記本。
下載航班資料
本教學課程會使用來自運輸統計局的 2016 年 1 月準點率航班資料,示範如何執行 ETL 作業。 您必須下載這項資料,才能完成本教學課程。
下載 On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip 檔案。 此檔案包含發行小眾測試版資料。
將 ZIP 檔案的內容解壓縮,並記下檔案名稱和檔案路徑。 稍後的步驟將會需要這項資訊。
如果您想要了解在準點報告效能資料中擷取的資訊,您可以在運輸統計局網站上看到欄位描述。
內嵌資料
在本節中,您會將 .csv正式發行前 小眾測試版數據上傳至 Azure Data Lake Storage 帳戶,然後將記憶體帳戶掛接至 Databricks 叢集。 最後,您會使用 Databricks 讀取 .csv 航班資料,並以 Apache Parquet 格式將其寫回儲存體。
將航班資料上傳至您的儲存體帳戶
使用 AzCopy 將.csv檔案複製到 Azure Data Lake Storage 帳戶。 您會使用 azcopy make
命令,在儲存體帳戶中建立 Blob 容器。 然後,使用 azcopy copy
命令來複製您剛下載至該容器中目錄的 csv 資料。
在下列步驟中,您必須輸入您要建立的容器名稱,以及容器中您要將航班資料上傳至其中的目錄和 Blob。 您可以在每個步驟中使用建議的名稱,或遵守容器、目錄和 Blob 的命名慣例來指定自己的名稱。
開啟命令提示字元視窗,並輸入下列命令以登入 Azure Active Directory,來存取您的儲存體帳戶。
azcopy login
請依照命令提示字元視窗中顯示的指示,來驗證您的使用者帳戶。
若要在您的儲存體帳戶中建立容器來儲存航班資料,請輸入下列命令:
azcopy make "https://<storage-account-name>.dfs.core.windows.net/<container-name>"
使用您的儲存體帳戶名稱取代
<storage-account-name>
預留位置值。將
<container-name>
預留位置取代為您想要建立以儲存 csv 資料的容器名稱,例如,flight-data-container。
若要將 csv 資料上傳 (複製) 至您的儲存體帳戶,請輸入下列命令。
azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
將
<csv-folder-path>
預留位置值更換為 .csv 檔案的名稱。使用您的儲存體帳戶名稱取代
<storage-account-name>
預留位置值。將
<container-name>
預留位置取代為您儲存體帳戶中的容器名稱。將
<directory-name>
預留位置取代為容器中要儲存您資料的目錄名稱,例如,jan2016。
將您的儲存體帳戶掛接至 Databricks 叢集
在本節中,您會將 Azure Data Lake Storage 雲端物件記憶體掛接至 Databricks 文件系統 (DBFS)。 您會使用先前建立的 Azure AD 服務原則,搭配儲存體帳戶進行驗證。 如需詳細資訊,請參閱在 Azure Databricks 上掛接雲端物件儲存體。
將您的筆記本連結至叢集。
在您先前建立的筆記本中,選取筆記本工具列右上角的 [連線] 按鈕。 此按鈕會開啟計算選取器。 (如果您已將筆記本連線至叢集,該叢集的名稱會顯示在按鈕文字中,代替 [連線])。
在叢集下拉式清單中,選取您先前建立的叢集。
請注意,叢集選取器中的文字會變更為「啟動中」。 等候叢集完成啟動,且叢集的名稱出現在按鈕中,然後再繼續。
將下列程式碼區塊複製並貼到第一個資料格中,但先不要執行此程式碼。
configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": "<appId>", "fs.azure.account.oauth2.client.secret": "<clientSecret>", "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token", "fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>", mount_point = "/mnt/flightdata", extra_configs = configs)
在下列程式碼區塊中:
在
configs
中,將<appId>
、<clientSecret>
和<tenantId>
預留位置值取代為您在必要條件中建立服務主體時所複製的應用程式識別碼、用戶端密碼和租用戶識別碼。在 URI 中
source
,將<storage-account-name>
、<container-name>
和<directory-name>
佔位元值取代為您 Azure Data Lake Storage 儲存器帳戶的名稱,以及您在將正式發行前小眾測試版數據上傳至記憶體帳戶時所指定的容器和目錄名稱。注意
URI
abfss
中的配置識別碼會告知 Databricks 搭配傳輸層安全性 (TLS) 使用 Azure Blob File System 驅動程式。 若要深入瞭解 URI,請參閱 使用 Azure Data Lake Storage URI。
請確定您的叢集已完成啟動,再繼續進行。
按 SHIFT + ENTER 鍵以執行此區塊中的程式碼。
儲存體帳戶中您將航班資料上傳至其中的容器和目錄,現在可在筆記本中透過掛接點 /mnt/flightdata 存取。
使用 Databricks Notebook 將 CSV 轉換成 Parquet
既然 CSV 正式發行前小眾測試版數據可透過 DBFS 裝入點存取,您可以使用 Apache Spark DataFrame 將它載入您的工作區,並以 Apache parquet 格式寫回 Azure Data Lake Storage 物件記憶體。
Spark DataFrame 是二維標籤資料結構,其中具有潛在不同類型的資料行。 您可以使用 DataFrame,以各種支援的格式輕鬆地讀取和寫入資料。 透過 DataFrame,您可以從雲端物件儲存體載入資料,並在計算叢集內對此資料執行分析和轉換,而不會影響雲端物件儲存體中的基礎資料。 若要深入了解,請參閱在 Azure Databricks 上使用 PySpark DataFrames。
Apache parquet 是一種單欄案檔格式,具有加速查詢的最佳化。 這是比 CSV 或 JSON 更有效率的檔案格式。 若要深入了解,請參閱 Parquet 檔案。
在筆記本中,新增資料格並將下列程式碼貼入其中。
# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/*.csv")
# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")
按 SHIFT + ENTER 鍵以執行此區塊中的程式碼。
繼續進行下一節之前,請確定所有 Parquet 資料都已寫入,且「完成」出現在輸出中。
探索資料
在本節中 ,您會使用 Databricks 文件系統公用程式 ,使用您在上一節中建立的 DBFS 裝入點來探索 Azure Data Lake Storage 物件記憶體。
在新的資料格中,貼上下列程式碼以取得掛接點的檔案清單。 第一個命令會輸出檔案和目錄的清單。 第二個命令會以表格式格式顯示輸出,以方便閱讀。
dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))
按 SHIFT + ENTER 鍵以執行此區塊中的程式碼。
請注意,parquet 目錄會出現在清單中。 您在前幾節已將 .csv 航班資料以 Parquet 格式儲存至 parquet/flight 目錄。 若要列出 parquet/flights 目錄中的檔案,請將下列程式碼貼入新的資料格中並加以執行:
display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))
若要建立新的檔案並將其列出,請將下列程式碼貼入新的資料格中並加以執行:
dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))
由於您不需要本教學課程中的 1.txt 檔案,因此您可以將下列程式碼貼入資料格中,然後執行此程式碼,以遞迴方式刪除 mydirectory。 True
參數表示遞迴刪除。
dbutils.fs.rm("/mnt/flightdata/mydirectory", True)
為了方便起見,您可以使用 help 命令來深入了解其他命令。
dbutils.fs.help("rm")
使用這些程式代碼範例,您已使用記憶體帳戶中儲存且已啟用 Azure Data Lake Storage 的數據來探索 HDFS 的階層式本質。
查詢資料
接下來,您可以開始查詢您上傳到儲存體帳戶的資料。 將下列每個程式碼區塊輸入至資料格,然後按 SHIFT + ENTER 來執行 Python 指令碼。
DataFrame 提供一組豐富的函式 (選取資料行、篩選、聯結、彙總),可讓您有效率地解決常見的資料分析問題。
若要從先前儲存的 Parquet 航班資料載入 DataFrame,並探索一些支援的功能,請將此指令碼輸入至新的資料格並加以執行。
# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")
# Print the schema of the dataframe
flight_df.printSchema()
# Print the flight database size
print("Number of flights in the database: ", flight_df.count())
# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)
# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected columns for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)
# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)
在新的資料格中輸入此指令碼,針對資料執行一些基本分析查詢。 您可以選擇執行整個指令碼 (SHIFT + ENTER)、醒目提示每個查詢並使用 CTRL + SHIFT + ENTER 個別執行該查詢,或將每個查詢輸入至個別的資料格並在該處執行該查詢。
# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')
# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())
# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()
# List out all the airports in Texas
airports_in_texas = spark.sql(
"SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)
# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
"SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)
# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
"SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()
# List airlines by the highest percentage of delayed flights. A delayed flight is one with a departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
"CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
"CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
"SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()
摘要
在本教學課程中,您已:
已建立 Azure 資源,包括 Azure Data Lake Storage 儲存器帳戶和 Azure AD 服務主體,以及指派許可權來存取記憶體帳戶。
已建立 Azure Databricks 工作區、筆記本和計算叢集。
使用 AzCopy 將非結構化 .csv 正式發行前小眾測試版數據上傳至 Azure Data Lake Storage 記憶體帳戶。
使用 Databricks 檔系統公用程式函式來掛接您的 Azure Data Lake Storage 儲存器帳戶,並探索其階層式文件系統。
使用 Apache Spark DataFrame 將.csv正式發行前小眾測試版數據轉換成 Apache parquet 格式,並將其儲存回 Azure Data Lake Storage 儲存器帳戶。
已使用 DataFrame 來探索航班資料並執行簡易查詢。
已使用 Apache Spark SQL 來查詢航班資料,以取得 2016 年 1 月每家航空公司的航班總數、德州的機場、從德州起飛的航線、全國各航空公司的平均抵達延誤時間 (以分鐘為單位),以及每家航空公司有多少百分比航班延誤起飛或抵達。
清除資源
如果您想要保留筆記本,稍後再回到其中,最好關閉 (終止) 您的叢集以避免產生費用。 若要終止您的叢集,請在筆記本工具列右上方的計算選取器中選取您的叢集、從功能表中選取 [終止],然後確認您的選取項目。 (根據預設,叢集會在無活動 120 分鐘後自動終止。)
如果想要刪除筆記本和叢集等個別工作區資源,您可以從工作區的資訊看板執行此動作。 如需詳細指示,請參閱刪除叢集或刪除筆記本。
當已不再需要資源時,請刪除資源群組及所有相關資源。 若要在 Azure 入口網站這麼做,請選取儲存體帳戶和工作區的資源群組,然後選取 [刪除]。