教程:创建、评估和评分文本分类模型

本教程在 Microsoft Fabric 中演示了用于文本分类模型的 Synapse 数据科学工作流的端到端示例。 该方案使用 Spark 上的 word2vec 和逻辑回归来确定英国图书馆图书数据集中的书籍的类型,仅基于该书的标题。

本教程介绍以下步骤:

  • 安装自定义库
  • 加载数据
  • 使用探索性数据分析了解和处理数据
  • 使用 word2vec 和逻辑回归训练机器学习模型,并使用 MLflow 和 Fabric 自动记录功能跟踪试验
  • 加载机器学习模型进行评分和预测

先决条件

在笔记本中继续操作

可以选择以下选项之一以在笔记中继续操作:

  • 打开并运行内置笔记本。
  • 从 GitHub 上传笔记本。

打开内置笔记本

本教程随附标题流派分类示例笔记本。

  1. 若要打开本教程的示例笔记本,请按照 为数据科学教程准备系统中的说明进行操作。

  2. 在开始运行代码之前,请务必将湖屋附加到笔记本

从 GitHub 导入笔记本

AIsample - Title Genre Classification.ipynb 是本教程随附的笔记本。

步骤 1:安装自定义库

对于机器学习模型开发或即席数据分析,可能需要为 Apache Spark 会话快速安装自定义库。 有两个选项可用于安装库。

  • 使用你的笔记本电脑内联安装功能(%pip%conda),仅在当前笔记本中安装库文件。
  • 或者,可以创建 Fabric 环境、从公共源安装库或将自定义库上传到该环境,然后工作区管理员可以将环境附加为工作区的默认值。 然后,环境中的所有库都将可用于工作区中的任何笔记本和 Spark 作业定义。 有关环境的详细信息,请参阅 在 Microsoft Fabric中创建、配置和使用环境。

对于分类模型,请使用 wordcloud 库来表示文本中的单词频率,其中单词的大小表示其频率。 在本教程中,请使用 %pip install 在笔记本中安装 wordcloud

注意

运行 %pip install 后,PySpark 内核将重启。 在运行任何其他单元格之前安装所需的库。

# Install wordcloud for text visualization by using pip
%pip install wordcloud

步骤 2:加载数据

数据集包含关于英国图书馆与Microsoft合作数字化的书籍的元数据。 元数据是分类信息,用于指示书籍是虚构还是非小说。 使用此数据集,目标是训练一个分类模型,该模型仅基于其标题来确定书籍的类型。

BL 记录 ID 资源类型 名字 与名称关联的日期 名称类型 角色 所有名称 标题 变体标题 系列标题 系列内的编号 出版物国家/地区 发布地点 发行人 发布日期 版本 物理说明 杜威分类 BL 货架标记 主题 流派 语言 笔记 物理资源的 BL 记录 ID classification_id 用户ID created_at subject_ids annotator_date_pub annotator_normalised_date_pub annotator_edition_statement 标注者类型 annotator_FAST_genre_terms annotator_FAST_subject_terms 注释者评论 annotator_main_language annotator_other_languages_summaries annotator_summaries_language annotator_translation annotator_original_language annotator_publisher annotator_place_pub annotator_country annotator_title 指向数字化书籍的链接 已批注
014602826 专著 Yearsley、Ann 1753-1806 person 穆尔,汉娜,1745-1833 [人物]; 耶尔斯利,安,1753-1806 [人物] 几个场合的诗歌 [附有 Hannah More 的前言信]。 英国 伦敦 1786 第四版手稿注释 Digital Store 11644.d.32 英语 003996603
014602830 专著 A、T. person Oldham, John, 1653-1683 [person]; A, T. [person] A Satyr against Vertue. (一首诗:应该是由 Town-Hector 说的 [John Oldham 著。序言署名:T. A.]) 英国 伦敦 1679 15 页(4°) Digital Store 11602.ee.10. (2.) 英语 000001143

定义以下参数,以便在不同的数据集上应用此笔记本:

IS_CUSTOM_DATA = False  # If True, the user must manually upload the dataset
DATA_FOLDER = "Files/title-genre-classification"
DATA_FILE = "blbooksgenre.csv"

# Data schema
TEXT_COL = "Title"
LABEL_COL = "annotator_genre"
LABELS = ["Fiction", "Non-fiction"]

EXPERIMENT_NAME = "sample-aisample-textclassification"  # MLflow experiment name

下载数据集并上传到湖屋

此代码下载数据集的公开可用版本,然后将其存储在 Fabric Lakehouse 中。

重要

在运行代码之前,将湖屋添加到笔记本。 否则将导致错误。

if not IS_CUSTOM_DATA:
    # Download demo data files into the lakehouse, if they don't exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Title_Genre_Classification"
    fname = "blbooksgenre.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        # Add a lakehouse, if no default lakehouse was added to the notebook
        # A new notebook won't link to any lakehouse by default
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

导入所需的库

在进行任何处理之前,需要导入所需的库,包括用于 SparkSynapseML的库:

import numpy as np
from itertools import chain

from wordcloud import WordCloud
import matplotlib.pyplot as plt
import seaborn as sns

import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
)

from synapse.ml.stages import ClassBalancer
from synapse.ml.train import ComputeModelStatistics

import mlflow

定义超参数

为模型训练定义一些超参数。

重要

仅当了解每个参数时,才修改这些超参数。

# Hyperparameters 
word2vec_size = 128  # The length of the vector for each word
min_word_count = 3  # The minimum number of times that a word must appear to be considered
max_iter = 10  # The maximum number of training iterations
k_folds = 3  # The number of folds for cross-validation

开始录制运行此笔记本所需的时间:

# Record the notebook running time
import time

ts = time.time()

设置 MLflow 试验跟踪

自动记录扩展 MLflow 日志记录功能。 自动记录会在训练机器学习模型时自动捕获输入参数值和输出指标。 然后将此信息记录到工作区。 在工作区中,可以使用 MLflow API 或工作区中的相应试验访问和可视化信息。 若要了解有关自动记录的详细信息,请参阅 Microsoft Fabric 中的自动记录。

# Set up Mlflow for experiment tracking

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable Mlflow autologging

若要在笔记本会话中禁用 Microsoft Fabric 自动记录,请调用 mlflow.autolog() 并设置 disable=True

从湖屋中读取原始日期数据

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True)

步骤 3:执行探索性数据分析

使用 display 命令浏览数据集,查看数据集的高级统计信息并显示图表视图:

display(raw_df.limit(20))

准备数据

删除重复项以清理数据:

df = (
    raw_df.select([TEXT_COL, LABEL_COL])
    .where(F.col(LABEL_COL).isin(LABELS))
    .dropDuplicates([TEXT_COL])
    .cache()
)

display(df.limit(20))

应用类均衡来解决任何偏差:

# Create a ClassBalancer instance, and set the input column to LABEL_COL
cb = ClassBalancer().setInputCol(LABEL_COL)

# Fit the ClassBalancer instance to the input DataFrame, and transform the DataFrame
df = cb.fit(df).transform(df)

# Display the first 20 rows of the transformed DataFrame
display(df.limit(20))

将段落和句子拆分为较小的单位,以标记数据集。 这样,分配含义就更容易了。 然后,删除非索引字以提高性能。 停用词删除涉及删除在语料库中所有文档中普遍出现的子词。 停用词删除是自然语言处理(NLP)应用中最常用的预处理步骤之一。

# Text transformer
tokenizer = Tokenizer(inputCol=TEXT_COL, outputCol="tokens")
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# Build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])

token_df = pipeline.fit(df).transform(df)

display(token_df.limit(20))

显示每个类的 wordcloud 库。 wordcloud 库是文本数据中经常出现的关键字的直观突出表示形式。 wordcloud 库非常有效,因为关键字的呈现形成了类似于云的彩色图片,以便更好地一目了然地捕获主文本数据。 详细了解 wordcloud

# WordCloud
for label in LABELS:
    tokens = (
        token_df.where(F.col(LABEL_COL) == label)
        .select(F.explode("filtered_tokens").alias("token"))
        .where(F.col("token").rlike(r"^\w+$"))
    )

    top50_tokens = (
        tokens.groupBy("token").count().orderBy(F.desc("count")).limit(50).collect()
    )

    # Generate a wordcloud image
    wordcloud = WordCloud(
        scale=10,
        background_color="white",
        random_state=42,  # Make sure the output is always the same for the same input
    ).generate_from_frequencies(dict(top50_tokens))

    # Display the generated image by using matplotlib
    plt.figure(figsize=(10, 10))
    plt.title(label, fontsize=20)
    plt.axis("off")
    plt.imshow(wordcloud, interpolation="bilinear")

最后,使用 word2vec 向量化文本。 word2vec 技术创建文本中每个单词的矢量表示形式。 在类似上下文中使用或具有语义关系的词汇,可以通过它们在向量空间中的接近性被有效获取。 这种接近性表示相似字词具有类似的单词向量。

# Label transformer
label_indexer = StringIndexer(inputCol=LABEL_COL, outputCol="labelIdx")
vectorizer = Word2Vec(
    vectorSize=word2vec_size,
    minCount=min_word_count,
    inputCol="filtered_tokens",
    outputCol="features",
)

# Build the pipeline
pipeline = Pipeline(stages=[label_indexer, vectorizer])
vec_df = (
    pipeline.fit(token_df)
    .transform(token_df)
    .select([TEXT_COL, LABEL_COL, "features", "labelIdx", "weight"])
)

display(vec_df.limit(20))

步骤 4:训练和评估模型

在数据准备就绪时,定义模型。 在本部分中,你将训练逻辑回归模型来对矢量化文本进行分类。

准备训练和测试数据集

# Split the dataset into training and testing
(train_df, test_df) = vec_df.randomSplit((0.8, 0.2), seed=42)

跟踪机器学习试验

机器学习试验是所有相关机器学习运行的组织和控制的主要单元。 一次运行对应于模型代码的一次执行。

机器学习试验跟踪管理所有试验及其组件,例如参数、指标、模型和其他项目。 跟踪可组织特定机器学习试验的所有必需组件。 它还允许通过保存的试验轻松重现过去的结果。 详细了解 Microsoft Fabric中的机器学习试验。

# Build the logistic regression classifier
lr = (
    LogisticRegression()
    .setMaxIter(max_iter)
    .setFeaturesCol("features")
    .setLabelCol("labelIdx")
    .setWeightCol("weight")
)

优化超参数

生成参数网格以搜索超参数。 然后创建一个交叉评估的估算器,以生成 CrossValidator 模型:

# Build a grid search to select the best values for the training parameters
param_grid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.03, 0.1])
    .addGrid(lr.elasticNetParam, [0.0, 0.1])
    .build()
)

if len(LABELS) > 2:
    evaluator_cls = MulticlassClassificationEvaluator
    evaluator_metrics = ["f1", "accuracy"]
else:
    evaluator_cls = BinaryClassificationEvaluator
    evaluator_metrics = ["areaUnderROC", "areaUnderPR"]
evaluator = evaluator_cls(labelCol="labelIdx", weightCol="weight")

# Build a cross-evaluator estimator
crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=k_folds,
    collectSubModels=True,
)

评估模型

我们可以评估测试数据集上的模型,以对其进行比较。 在针对验证和测试数据集运行时,经过良好训练的模型应针对相关指标演示高性能。

def evaluate(model, df):
    log_metric = {}
    prediction = model.transform(df)
    for metric in evaluator_metrics:
        value = evaluator.evaluate(prediction, {evaluator.metricName: metric})
        log_metric[metric] = value
        print(f"{metric}: {value:.4f}")
    return prediction, log_metric

使用 MLflow 跟踪实验

启动训练和评估过程。 使用 MLflow 跟踪所有试验,以及记录参数、指标和模型。 所有这些信息都记录在工作区中的试验名称下。

with mlflow.start_run(run_name="lr"):
    models = crossval.fit(train_df)
    best_metrics = {k: 0 for k in evaluator_metrics}
    best_index = 0
    for idx, model in enumerate(models.subModels[0]):
        with mlflow.start_run(nested=True, run_name=f"lr_{idx}") as run:
            print("\nEvaluating on test data:")
            print(f"subModel No. {idx + 1}")
            prediction, log_metric = evaluate(model, test_df)

            if log_metric[evaluator_metrics[0]] > best_metrics[evaluator_metrics[0]]:
                best_metrics = log_metric
                best_index = idx

            print("log model")
            mlflow.spark.log_model(
                model,
                f"{EXPERIMENT_NAME}-lrmodel",
                registered_model_name=f"{EXPERIMENT_NAME}-lrmodel",
                dfs_tmpdir="Files/spark",
            )

            print("log metrics")
            mlflow.log_metrics(log_metric)

            print("log parameters")
            mlflow.log_params(
                {
                    "word2vec_size": word2vec_size,
                    "min_word_count": min_word_count,
                    "max_iter": max_iter,
                    "k_folds": k_folds,
                    "DATA_FILE": DATA_FILE,
                }
            )

    # Log the best model and its relevant metrics and parameters to the parent run
    mlflow.spark.log_model(
        models.subModels[0][best_index],
        f"{EXPERIMENT_NAME}-lrmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-lrmodel",
        dfs_tmpdir="Files/spark",
    )
    mlflow.log_metrics(best_metrics)
    mlflow.log_params(
        {
            "word2vec_size": word2vec_size,
            "min_word_count": min_word_count,
            "max_iter": max_iter,
            "k_folds": k_folds,
            "DATA_FILE": DATA_FILE,
        }
    )

查看试验:

  1. 在左侧导航栏中选择工作区
  2. 查找并选择试验名称 - 在本例中为 sample_aisample-textclassification

试验的屏幕截图。

步骤 5:评分和保存预测结果

Microsoft Fabric 允许用户使用可缩放 PREDICT 函数操作机器学习模型。 此函数支持任何计算引擎中的批处理评分(或批处理推理)。 可以直接从笔记本或特定模型的项目页面创建批量预测。 若要详细了解 PREDICT 以及如何在 Fabric 中使用它,请参阅 Microsoft Fabric 中的机器学习模型评分

从前面的评估结果来看,模型 1 在精确率-召回率曲线下面积 (AUPRC) 和接收者操作特性曲线下面积 (AUC-ROC) 这两个指标上都表现最好。 因此,应使用模型 1 进行预测。

AUC-ROC 度量通常用于测量二进制分类器性能。 但是,有时更适合使用 AUPRC 指标来评估分类器。 AUC-ROC 图表展示真正阳性率(TPR)和假阳性率(FPR)之间的权衡。 AUPRC 曲线在单个可视化效果中结合了精准率(正预测值,简称 PPV)和召回率(真正率,简称 TPR)。

# Load the best model
model_uri = f"models:/{EXPERIMENT_NAME}-lrmodel/1"
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

# Verify the loaded model
batch_predictions = loaded_model.transform(test_df)
batch_predictions.show(5)
# Code to save userRecs in the lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")