共用方式為


訓練和評估時間序列預測模型

在此筆記本中,我們會建置程式來預測具有季節性週期的時間序列資料。 我們使用紐約市財政局於紐約市開放資料入口網站上發佈的 2003 年至 2015 年期間的紐約市房地產銷售資料集

必要條件

遵循筆記本中的指示

您可以透過下列兩種方式之一遵循筆記本中的指示操作:

  • 在 Synapse 資料科學體驗中開啟並執行內建筆記本。
  • 將筆記本從 GitHub 上傳至 Synapse 資料科學體驗。

開啟內建筆記本

本教學課程隨附範例時間序列筆記本。

在 Synapse 資料科學體驗中開啟教學課程的內建範例筆記本:

  1. 移至 Synapse 資料科學首頁。

  2. 選取 [使用範例]

  3. 選取對應的範例:

    • 如果範例適用於 Python 教學課程,則從預設的 [端對端工作流程 (Python)] 索引標籤選取。
    • 如果範例適用於 R 教學課程,則從 [端對端工作流程] 索引標籤選取。
    • 如果範例適用於快速教學課程,則從 [快速教學課程] 索引標籤選取。
  4. 開始執行程式碼之前,請先將 Lakehouse 附加至筆記本

從 GitHub 匯入筆記本

AIsample - 時間序列 Forecasting.ipynb 是本教學課程隨附的筆記本。

若要開啟本教學課程隨附的筆記本,請遵循為資料科學教學課程準備系統中的指示,將筆記本匯入您的工作區。

如果您想要複製並貼上此頁面中的程式碼,則可以建立新的筆記本

開始執行程式碼之前,請務必將 Lakehouse 連結至筆記本

步驟 1:安裝自訂程式庫

當您開發機器學習模型,或處理特定資料分析時,可能需要快速安裝 Apache Spark 工作階段的自訂程式庫 (例如,此筆記本中的 prophet)。 為此,您有兩個選項。

  1. 您可以使用內嵌安裝功能 (例如 %pip%conda 等) 快速開始使用新的程式庫。 這只會在目前的筆記本中,而不是在工作區中安裝自訂程式庫。
# Use pip to install libraries
%pip install <library name>

# Use conda to install libraries
%conda install <library name>
  1. 或者,您可以建立 Fabric 環境,從公用來源安裝程式庫,或將自訂程式庫上傳至該環境,然後您的工作區管理員可將環境連結為工作區的預設值。 環境中的所有程式庫隨後可供在工作區中的任何筆記本和 Spark 工作定義使用。 如需有關環境的詳細資訊,請參閱在 Microsoft Fabric 中建立、設定和使用環境

針對此筆記本,您可以使用 %pip install 來安裝 prophet 程式庫。 PySpark 核心會在 %pip install 之後重新啟動。 這表示您必須先安裝程式庫,才能執行任何其他儲存格。

# Use pip to install Prophet
%pip install prophet

步驟 2:載入資料

資料集

此筆記本會使用紐約市房地產銷售資料資料集。 它涵蓋從 2003 年到 2015 年的資料,由紐約市財政局在紐約市開放資料入口網站上發佈。

資料集包含了 13 年內紐約市房地產市場每棟建築銷售的記錄。 如需資料集中資料行的定義,請參閱房地產銷售檔案的詞彙

borough 鄰居 building_class_category tax_class block lot eastment building_class_at_present address apartment_number zip_code residential_units commercial_units total_units land_square_feet gross_square_feet year_built tax_class_at_time_of_sale building_class_at_time_of_sale sale_price sale_date
Manhattan 字母城 07 出租 - 無電梯公寓 0.0 384.0 17.0 C4 225 EAST 2ND STREET 10009.0 10.0 0.0 10.0 2145.0 6670.0 1900.0 2.0 C4 275000.0 2007 年 6 月 19 日
Manhattan 字母城 07 出租 - 無電梯公寓 2.0 405.0 12.0 C7 508 EAST 12TH STREET 10009.0 28.0 2.0 30.0 3872.0 15428.0 1930.0 2.0 C7 7794005.0 2007 年 5 月 21 日

目標是根據歷程記錄資料,建置預測每月總銷售額的模型。 為此,您使用 Facebook 開發的開放原始碼預測程式庫 Prophet。 Prophet 以加法模型為基礎,其中非線性趨勢符合每日、每週和每年季節性和假日效果。 Prophet 在具有強烈季節效應的時間序列資料集,以及數季的歷史資料上效果最佳。 此外,Prophet 會強固地處理遺漏的資料和資料極端值。

Prophet 使用可分解的時間序列模型,其中包含三個元件:

  • 趨勢:Prophet 假設有分段常數的成長率,並自動變更點選取範圍
  • 季節性:根據預設,Prophet 會使用傅立葉系列來符合每週和每年的季節性
  • 假期:Prophet 需要所有過去和未來出現的假期。 如果某個假期將來不會重現,Prophet 不會將其納入預測。

此筆記本每月彙總資料,因此會忽略假日。

如需 Prophet 模型化技術的詳細資訊,請閱讀官方檔案

下載資料集,並上傳至 Lakehouse

資料來源包含 15 個 .csv 檔案。 這些檔案包含 2003 年至 2015 年間紐約五個行政區的房地產銷售記錄。 為了方便起見,nyc_property_sales.tar 檔案保存所有這些 .csv 檔案,將其壓縮成一個檔案。 公開可用的 Blob 儲存體會裝載此 .tar 檔案。

提示

透過此程式碼儲存格中顯示的參數,您可以輕鬆地將此筆記本套用至不同的資料集。

URL = "https://synapseaisolutionsa.blob.core.windows.net/public/NYC_Property_Sales_Dataset/"
TAR_FILE_NAME = "nyc_property_sales.tar"
DATA_FOLDER = "Files/NYC_Property_Sales_Dataset"
TAR_FILE_PATH = f"/lakehouse/default/{DATA_FOLDER}/tar/"
CSV_FILE_PATH = f"/lakehouse/default/{DATA_FOLDER}/csv/"

EXPERIMENT_NAME = "aisample-timeseries" # MLflow experiment name

此程式碼會下載公開可用的資料集版本,然後將該資料集儲存在 Fabric Lakehouse 中。

重要

在執行之前,請確定已將 Lakehouse 新增至筆記本。 無法執行這項操作時,將會發生錯誤。

import os

if not os.path.exists("/lakehouse/default"):
    # Add a lakehouse if the notebook has no default lakehouse
    # A new notebook will not link to any lakehouse by default
    raise FileNotFoundError(
        "Default lakehouse not found, please add a lakehouse for the notebook."
    )
else:
    # Verify whether or not the required files are already in the lakehouse, and if not, download and unzip
    if not os.path.exists(f"{TAR_FILE_PATH}{TAR_FILE_NAME}"):
        os.makedirs(TAR_FILE_PATH, exist_ok=True)
        os.system(f"wget {URL}{TAR_FILE_NAME} -O {TAR_FILE_PATH}{TAR_FILE_NAME}")

    os.makedirs(CSV_FILE_PATH, exist_ok=True)
    os.system(f"tar -zxvf {TAR_FILE_PATH}{TAR_FILE_NAME} -C {CSV_FILE_PATH}")

開始錄製此筆記本的執行階段。

# Record the notebook running time
import time

ts = time.time()

設定 MLflow 實驗追蹤

若要擴充 MLflow 記錄功能,自動記錄會在訓練期間自動擷取機器學習模型的輸入參數和輸出計量值。 此資訊接著會記錄到工作區,MLflow API 或工作區中的對應實驗可以存取並視覺化該資訊。 如需自動記錄的詳細資訊,請瀏覽此資源

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

注意

若要停用筆記本工作階段的 Microsoft Fabric 自動記錄,請呼叫 mlflow.autolog() 並設定 disable=True

從 Lakehouse 讀取原始日期資料

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("Files/NYC_Property_Sales_Dataset/csv")
)

步驟 3:開始探索式資料分析

若要檢閱資料集,可以手動檢查資料子集,以進一步了解資料集。 您可以使用 display 函數來列印 DataFrame。 您也可以顯示圖表檢視,輕鬆地將資料集的子集視覺化。

display(df)

手動檢閱資料集會導致一些早期觀察結果:

  • 0.00 美元銷售價格的執行個體。 根據名詞解釋,這意味著所有權的轉讓無需現金對價。 換句話說,交易中沒有現金的流動。 您應該從資料集中移除具有 0.00 美元 sales_price 值的銷售額。

  • 資料集涵蓋不同的建置類別。 不過,此筆記本將著重於住宅建築,根據名詞解釋,標示為「A」類型。 您應該篩選資料集,僅包含住宅建築。 為此,請包含 building_class_at_time_of_salebuilding_class_at_present 資料行。 您必須僅包含 building_class_at_time_of_sale 資料。

  • 資料集包含 total_units 值等於 0,或 gross_square_feet 值等於 0 的執行個體。 您應該移除所有執行個體,其中 total_unitsgross_square_units 值等於 0。

  • 某些資料行,例如 apartment_numbertax_classbuild_class_at_present 等,有遺漏或 NULL 值。 假設遺漏的資料牽涉筆誤或不存在的資料。 分析不取決於這些遺漏值,因此可以略過這些值。

  • sale_price 資料行會儲存為字串,前面加上 "$" 字元。 若要繼續進行分析,請將此資料行表示為數位。 您應該將 sale_price 資料行轉換成整數。

類型轉換和篩選

若要解決某些已識別的問題,請匯入必要的程式庫。

# Import libraries
import pyspark.sql.functions as F
from pyspark.sql.types import *

將銷售資料從字串轉換成整數

使用正則表達式來分隔字串的數值部分與貨幣符號(例如,在字串 $300,000中,分割 $300,000),然後將數值部分轉換成整數。

接下來,篩選資料,僅包含符合下列所有條件的執行個體:

  1. sales_price 大於 0
  2. total_units 大於 0
  3. gross_square_feet 大於 0
  4. building_class_at_time_of_sale 為 A 型
df = df.withColumn(
    "sale_price", F.regexp_replace("sale_price", "[$,]", "").cast(IntegerType())
)
df = df.select("*").where(
    'sale_price > 0 and total_units > 0 and gross_square_feet > 0 and building_class_at_time_of_sale like "A%"'
)

每月彙總

資料資源會每天追蹤房地產銷售,但此方法對於此筆記本而言過於細化。 相反,每月彙總資料更為合適。

首先,變更日期值,只顯示月份和年份資料。 日期值仍會包含年份資料。 例如,仍然可以區分 2005 年 12 月和 2006 年 12 月。

此外,僅保留與分析相關的資料行。 其中包括 sales_pricetotal_unitsgross_square_feetsales_date。 您必須將 sales_date 重新命名為 month

monthly_sale_df = df.select(
    "sale_price",
    "total_units",
    "gross_square_feet",
    F.date_format("sale_date", "yyyy-MM").alias("month"),
)
display(monthly_sale_df)

依月份彙總 sale_pricetotal_unitsgross_square_feet 值。 然後,依 month 將資料分組,並加總每個群組中的所有值。

summary_df = (
    monthly_sale_df.groupBy("month")
    .agg(
        F.sum("sale_price").alias("total_sales"),
        F.sum("total_units").alias("units"),
        F.sum("gross_square_feet").alias("square_feet"),
    )
    .orderBy("month")
)

display(summary_df)

Pyspark 到 Pandas 的轉換

Pyspark DataFrames 妥善處理大型資料集。 不過,由於資料彙總,DataFrame 的大小較小。 這意味著您現在可以使用 pandas DataFrame。

此程式碼會將資料集從 pyspark DataFrame 轉換成 pandas DataFrame。

import pandas as pd

df_pandas = summary_df.toPandas()
display(df_pandas)

視覺效果

您可以檢查紐約市的房地產交易趨勢,以進一步了解資料。 這有助於洞悉潛在模式和季節性趨勢。 在資源中深入了解 Microsoft Fabric 資料視覺效果。

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

f, (ax1, ax2) = plt.subplots(2, 1, figsize=(35, 10))
plt.sca(ax1)
plt.xticks(np.arange(0, 15 * 12, step=12))
plt.ticklabel_format(style="plain", axis="y")
sns.lineplot(x="month", y="total_sales", data=df_pandas)
plt.ylabel("Total Sales")
plt.xlabel("Time")
plt.title("Total Property Sales by Month")

plt.sca(ax2)
plt.xticks(np.arange(0, 15 * 12, step=12))
plt.ticklabel_format(style="plain", axis="y")
sns.lineplot(x="month", y="square_feet", data=df_pandas)
plt.ylabel("Total Square Feet")
plt.xlabel("Time")
plt.title("Total Property Square Feet Sold by Month")
plt.show()

探索式資料分析的觀察摘要

  • 資料顯示一年一次的明顯週期性模式:這表示資料具有每年的季節性
  • 與冬季月份相比,夏季月份的銷售量似乎較高
  • 比較銷售額高的年份與銷售額低的年份,在銷售額高的年份,銷售額高的月份與銷售額低的月份之間的收入差異,其絕對值超過在銷售額低的年份,銷售額高的月份與銷售額低的月份之間的收入差異。

例如,在 2004 年,銷售額最高月份與銷售額最低月份之間的收入差異大約為:

$900,000,000 - $500,000,000 = $400,000,000

對於 2011 年,該收入差異的計算關於:

$400,000,000 - $300,000,000 = $100,000,000

當您必須決定季節性效應是 [乘法效應]還是 [加法效應] 時,這一點就變得很重要。

步驟 4:模型訓練和追蹤

模型調整

Prophet 輸入一律為兩個資料行 DataFrame。 一個輸入資料行是名為 ds 的時間資料行,而一個輸入資料行是名為 y 的值資料行。 時間資料列應該有日期、時間或日期時間資料格式 (例如 YYYY_MM)。 此處的資料集符合該條件。 值資料行必須是數值資料格式。

針對模型調整,您只能將時間資料行重新命名為 ds,將值資料行重新命名為 y,並將資料傳遞至 Prophet。 如需詳細資訊,請閱讀 Prophet Python API 文件

df_pandas["ds"] = pd.to_datetime(df_pandas["month"])
df_pandas["y"] = df_pandas["total_sales"]

Prophet 遵循 scikit-learn 慣例。 首先,建立 Prophet 的新執行個體、設定特定參數 (例如 seasonality_mode),然後將該執行個體放入資料集。

  • 雖然常數加法因數是 Prophet 的預設季節性效果,但您應該針對季節性效果參數使用 「乘法」季節性。 上一節的分析顯示,由於季節性幅度的變化,簡單的加法季節性根本無法很好地貼合資料。

  • 因為資料依月份彙總,將 weekly_seasonality 參數設定為 [關]。 因此,無法使用每週資料。

  • 使用 Markov Chain Monte Carlo (MCMC) 方法來擷取季節性不確定性估計值。 根據預設,Prophet 可以提供趨勢和觀察雜訊的不確定性估計值,但不能提供季節性的不確定性估計值。 MCMC 需要更多的處理時間,但它們允許演算法提供季節性,以及趨勢和觀察雜訊的不確定性估計值。 如需詳細資訊,請參閱 Prophet 不確定性間隔文件

  • 透過 changepoint_prior_scale 參數來調整自動變更點偵測敏感度。 Prophet 演算法會自動嘗試在突然變更軌跡的資料中尋找執行個體。 很難找到正確的值。 若要解決此問題,可以嘗試不同的值,然後選取具有最佳效能的模型。 如需詳細資訊,請參閱 Prophet 趨勢變更點文件

from prophet import Prophet

def fit_model(dataframe, seasonality_mode, weekly_seasonality, chpt_prior, mcmc_samples):
    m = Prophet(
        seasonality_mode=seasonality_mode,
        weekly_seasonality=weekly_seasonality,
        changepoint_prior_scale=chpt_prior,
        mcmc_samples=mcmc_samples,
    )
    m.fit(dataframe)
    return m

交叉驗證

Prophet 有內建的交叉驗證工具。 此工具可以估算預測錯誤,並找出具有最佳效能的模型。

交叉驗證技術可驗證模型效率。 這項技術會在資料集的子集上訓練模型,並在先前未見過的資料集子集上執行測試。 這項技術可以檢查統計模型如何一般化為獨立的資料集。

針對交叉驗證,請保留資料集的特定範例,該範例不屬於訓練資料集的一部分。 然後,在部署之前,在該範例上測試訓練的模型。 不過,這種方法不適用於時間序列資料,因為如果模型已經看到 2005 年 1 月和 2005 年 3 月的資料,而當您嘗試預測 2005 年 2 月,模型基本上可以作弊,因為它可以看到資料趨勢的的走向。 在實際應用程式中,目標是預測未來,即看不見的區域。

若要處理此作業,並讓測試可靠,請根據日期分割資料集。 使用特定日期 (例如,前 11 年的資料) 之前的資料集進行訓練,然後使用其餘未見過的資料進行預測。

在此案例中,從 11 年的訓練資料開始,然後使用一年範圍進行每月預測。 具體來說,訓練資料包含 2003 到 2013 年的所有項目。 接下來,第一次執行會處理 2014 年 1 月至 2015 年 1 月的預測。 下一次執行會處理 2014 年 2 月至 2015 年 2 月的預測,依此類推。

針對三個訓練模型的每一個重複此程序,查看哪一個模型表現最好。 然後,將這些預測與真實世界的值進行比較,以建立最佳模型的預測品質。

from prophet.diagnostics import cross_validation
from prophet.diagnostics import performance_metrics

def evaluation(m):
    df_cv = cross_validation(m, initial="4017 days", period="30 days", horizon="365 days")
    df_p = performance_metrics(df_cv, monthly=True)
    future = m.make_future_dataframe(periods=12, freq="M")
    forecast = m.predict(future)
    return df_p, future, forecast

使用 MLflow 記錄模型

記錄模型,以追蹤其參數,並儲存模型以供日後使用。 所有相關模型資訊都會記錄在工作區中的實驗名稱下。 模型、參數和計量,以及 MLflow 自動記錄項目會儲存在一個 MLflow 執行中。

# Setup MLflow
from mlflow.models.signature import infer_signature

進行實驗

機器學習實驗可以作為組織和控制的主要單位,用於所有相關的機器學習執行。 執行會對應至模型程式碼的單一執行。 機器學習實驗追蹤是指管理所有不同的實驗及其元件。 這包括參數、計量、模型和其他成品,它有助於組織特定機器學習實驗的必要元件。 機器學習實驗追蹤也能讓您使用已儲存的實驗輕鬆重複過去的結果。 深入了解 Microsoft Fabric 中的機器學習實驗。 一旦您確定了您想要包含的步驟 (例如,調整和評估此筆記本中的 Prophet 模型),就可以執行實驗。

model_name = f"{EXPERIMENT_NAME}-prophet"

models = []
df_metrics = []
forecasts = []
seasonality_mode = "multiplicative"
weekly_seasonality = False
changepoint_priors = [0.01, 0.05, 0.1]
mcmc_samples = 100

for chpt_prior in changepoint_priors:
    with mlflow.start_run(run_name=f"prophet_changepoint_{chpt_prior}"):
        # init model and fit
        m = fit_model(df_pandas, seasonality_mode, weekly_seasonality, chpt_prior, mcmc_samples)
        models.append(m)
        # Validation
        df_p, future, forecast = evaluation(m)
        df_metrics.append(df_p)
        forecasts.append(forecast)
        # Log model and parameters with MLflow
        mlflow.prophet.log_model(
            m,
            model_name,
            registered_model_name=model_name,
            signature=infer_signature(future, forecast),
        )
        mlflow.log_params(
            {
                "seasonality_mode": seasonality_mode,
                "mcmc_samples": mcmc_samples,
                "weekly_seasonality": weekly_seasonality,
                "changepoint_prior": chpt_prior,
            }
        )
        metrics = df_p.mean().to_dict()
        metrics.pop("horizon")
        mlflow.log_metrics(metrics)

屬性面板的螢幕擷取畫面。

使用 Prophet 將模型視覺化

Prophet 具有內建的視覺效果函數,可顯示模型調整結果。

黑點表示用來訓練模型的資料點。 藍線是預測,淺藍色區域表示不確定性區間。 您建置了三個具有不同 changepoint_prior_scale 值的模型。 這三個模型的預測會顯示在此程式碼區塊的結果中。

for idx, pack in enumerate(zip(models, forecasts)):
    m, forecast = pack
    fig = m.plot(forecast)
    fig.suptitle(f"changepoint = {changepoint_priors[idx]}")

第一個圖表中最小的 changepoint_prior_scale 值會導致趨勢變更擬合不足。 第三個圖表中最大的 changepoint_prior_scale 會導致過度學習。 因此,第二個圖表似乎成為最佳選擇。 這意味著第二個模型最適合。

Prophet 也可以輕鬆地將基礎趨勢和季節性視覺化。 第二個模型的視覺效果會顯示在此程式碼區塊的結果中。

BEST_MODEL_INDEX = 1  # Set the best model index according to the previous results
fig2 = models[BEST_MODEL_INDEX].plot_components(forecast)

定價資料中年度趨勢圖表的螢幕擷取畫面。

在這些圖表中,淺藍色陰影反映了不確定性。 頂端圖表顯示強勁的長期振蕩趨勢。 幾年來,銷售量起起伏伏。 較低的圖表顯示,2 月和 9 月的銷售量趨於峰值,在這兩個月份達到全年最高值。 在那些月之後不久,在 3 月和 10 月,它們跌至該年度的最小值。

使用各種計量來評估模型的效能,例如:

  • 平均平方誤差 (MSE)
  • 均方根誤差 (RMSE)
  • 平均絕對誤差 (MAE)
  • 平均絕對百分比誤差 (MAPE)
  • 中位數絕對百分比誤差 (MDAPE)
  • 對稱平均絕對百分比誤差 (SMAPE)

使用 yhat_loweryhat_upper 估計值來評估涵蓋範圍。 請注意您未來預測一年的不同範圍,共 12 次。

display(df_metrics[BEST_MODEL_INDEX])

透過 MAPE 計量,針對此預測模型,將一個月延長至未來的預測通常涉及大約 8% 的誤差。 不過,對於未來一年的預測,誤差會增加到大約 10%。

步驟 5:給模型評分並儲存預測結果

現在給模型評分,並儲存預測結果。

使用 Predict Transformer 進行預測

現在,您可以載入模型,並用其進行預測。 使用者可以使用 PREDICT 來運作機器學習模型,這是可調整的 Microsoft Fabric 函數,支援任何計算引擎中的批次評分。 在此資源中深入了解 PREDICT,以及如何在 Microsoft Fabri 內使用它。

from synapse.ml.predict import MLFlowTransformer

spark.conf.set("spark.synapse.ml.predict.enabled", "true")

model = MLFlowTransformer(
    inputCols=future.columns.values,
    outputCol="prediction",
    modelName=f"{EXPERIMENT_NAME}-prophet",
    modelVersion=BEST_MODEL_INDEX,
)

test_spark = spark.createDataFrame(data=future, schema=future.columns.to_list())

batch_predictions = model.transform(test_spark)

display(batch_predictions)
# Code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")