教程:创建、评估和评分文本分类模型
本教程在 Microsoft Fabric 中演示了用于文本分类模型的 Synapse 数据科学工作流的端到端示例。 该方案使用 Spark 上的 word2vec 和逻辑回归来确定英国图书馆图书数据集中的书籍的类型,仅基于该书的标题。
本教程介绍以下步骤:
- 安装自定义库
- 加载数据
- 使用探索性数据分析了解和处理数据
- 使用 word2vec 和逻辑回归训练机器学习模型,并使用 MLflow 和 Fabric 自动记录功能跟踪试验
- 加载机器学习模型进行评分和预测
先决条件
获取 Microsoft Fabric 订阅。 或者,注册免费的 Microsoft Fabric 试用版。
登录 Microsoft Fabric。
使用主页左下侧的体验切换器切换到 Fabric。
- 如果没有 Microsoft Fabric 湖屋,请按照在 Microsoft Fabric 中创建湖屋中的步骤创建一个。
在笔记本中继续操作
可以选择以下选项之一以在笔记中继续操作:
- 打开并运行内置笔记本。
- 从 GitHub 上传笔记本。
打开内置笔记本
本教程随附标题流派分类示例笔记本。
若要打开本教程的示例笔记本,请按照 为数据科学教程准备系统中的说明进行操作。
在开始运行代码之前,请务必将湖屋附加到笔记本。
从 GitHub 导入笔记本
AIsample - Title Genre Classification.ipynb 是本教程随附的笔记本。
若要打开本教程随附的笔记本,请按照 为数据科学教程准备系统 中的说明将笔记本导入工作区。
如果要复制并粘贴此页面中的代码,可以 创建新的笔记本。
在开始运行代码之前,请务必将湖屋附加到笔记本。
步骤 1:安装自定义库
对于机器学习模型开发或即席数据分析,可能需要为 Apache Spark 会话快速安装自定义库。 有两个选项可用于安装库。
- 使用你的笔记本电脑内联安装功能(
%pip
或%conda
),仅在当前笔记本中安装库文件。 - 或者,可以创建 Fabric 环境、从公共源安装库或将自定义库上传到该环境,然后工作区管理员可以将环境附加为工作区的默认值。 然后,环境中的所有库都将可用于工作区中的任何笔记本和 Spark 作业定义。 有关环境的详细信息,请参阅 在 Microsoft Fabric中创建、配置和使用环境。
对于分类模型,请使用 wordcloud
库来表示文本中的单词频率,其中单词的大小表示其频率。 在本教程中,请使用 %pip install
在笔记本中安装 wordcloud
。
注意
运行 %pip install
后,PySpark 内核将重启。 在运行任何其他单元格之前安装所需的库。
# Install wordcloud for text visualization by using pip
%pip install wordcloud
步骤 2:加载数据
数据集包含关于英国图书馆与Microsoft合作数字化的书籍的元数据。 元数据是分类信息,用于指示书籍是虚构还是非小说。 使用此数据集,目标是训练一个分类模型,该模型仅基于其标题来确定书籍的类型。
BL 记录 ID | 资源类型 | 名字 | 与名称关联的日期 | 名称类型 | 角色 | 所有名称 | 标题 | 变体标题 | 系列标题 | 系列内的编号 | 出版物国家/地区 | 发布地点 | 发行人 | 发布日期 | 版本 | 物理说明 | 杜威分类 | BL 货架标记 | 主题 | 流派 | 语言 | 笔记 | 物理资源的 BL 记录 ID | classification_id | 用户ID | created_at | subject_ids | annotator_date_pub | annotator_normalised_date_pub | annotator_edition_statement | 标注者类型 | annotator_FAST_genre_terms | annotator_FAST_subject_terms | 注释者评论 | annotator_main_language | annotator_other_languages_summaries | annotator_summaries_language | annotator_translation | annotator_original_language | annotator_publisher | annotator_place_pub | annotator_country | annotator_title | 指向数字化书籍的链接 | 已批注 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
014602826 | 专著 | Yearsley、Ann | 1753-1806 | person | 穆尔,汉娜,1745-1833 [人物]; 耶尔斯利,安,1753-1806 [人物] | 几个场合的诗歌 [附有 Hannah More 的前言信]。 | 英国 | 伦敦 | 1786 | 第四版手稿注释 | Digital Store 11644.d.32 | 英语 | 003996603 | 假 | |||||||||||||||||||||||||||||||
014602830 | 专著 | A、T. | person | Oldham, John, 1653-1683 [person]; A, T. [person] | A Satyr against Vertue. (一首诗:应该是由 Town-Hector 说的 [John Oldham 著。序言署名:T. A.]) | 英国 | 伦敦 | 1679 | 15 页(4°) | Digital Store 11602.ee.10. (2.) | 英语 | 000001143 | 假 |
定义以下参数,以便在不同的数据集上应用此笔记本:
IS_CUSTOM_DATA = False # If True, the user must manually upload the dataset
DATA_FOLDER = "Files/title-genre-classification"
DATA_FILE = "blbooksgenre.csv"
# Data schema
TEXT_COL = "Title"
LABEL_COL = "annotator_genre"
LABELS = ["Fiction", "Non-fiction"]
EXPERIMENT_NAME = "sample-aisample-textclassification" # MLflow experiment name
下载数据集并上传到湖屋
此代码下载数据集的公开可用版本,然后将其存储在 Fabric Lakehouse 中。
重要
在运行代码之前,将湖屋添加到笔记本。 否则将导致错误。
if not IS_CUSTOM_DATA:
# Download demo data files into the lakehouse, if they don't exist
import os, requests
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Title_Genre_Classification"
fname = "blbooksgenre.csv"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
# Add a lakehouse, if no default lakehouse was added to the notebook
# A new notebook won't link to any lakehouse by default
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
导入所需的库
在进行任何处理之前,需要导入所需的库,包括用于 Spark 和 SynapseML的库:
import numpy as np
from itertools import chain
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import (
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator,
)
from synapse.ml.stages import ClassBalancer
from synapse.ml.train import ComputeModelStatistics
import mlflow
定义超参数
为模型训练定义一些超参数。
重要
仅当了解每个参数时,才修改这些超参数。
# Hyperparameters
word2vec_size = 128 # The length of the vector for each word
min_word_count = 3 # The minimum number of times that a word must appear to be considered
max_iter = 10 # The maximum number of training iterations
k_folds = 3 # The number of folds for cross-validation
开始录制运行此笔记本所需的时间:
# Record the notebook running time
import time
ts = time.time()
设置 MLflow 试验跟踪
自动记录扩展 MLflow 日志记录功能。 自动记录会在训练机器学习模型时自动捕获输入参数值和输出指标。 然后将此信息记录到工作区。 在工作区中,可以使用 MLflow API 或工作区中的相应试验访问和可视化信息。 若要了解有关自动记录的详细信息,请参阅 Microsoft Fabric 中的自动记录。
# Set up Mlflow for experiment tracking
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable Mlflow autologging
若要在笔记本会话中禁用 Microsoft Fabric 自动记录,请调用 mlflow.autolog()
并设置 disable=True
:
从湖屋中读取原始日期数据
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True)
步骤 3:执行探索性数据分析
使用 display
命令浏览数据集,查看数据集的高级统计信息并显示图表视图:
display(raw_df.limit(20))
准备数据
删除重复项以清理数据:
df = (
raw_df.select([TEXT_COL, LABEL_COL])
.where(F.col(LABEL_COL).isin(LABELS))
.dropDuplicates([TEXT_COL])
.cache()
)
display(df.limit(20))
应用类均衡来解决任何偏差:
# Create a ClassBalancer instance, and set the input column to LABEL_COL
cb = ClassBalancer().setInputCol(LABEL_COL)
# Fit the ClassBalancer instance to the input DataFrame, and transform the DataFrame
df = cb.fit(df).transform(df)
# Display the first 20 rows of the transformed DataFrame
display(df.limit(20))
将段落和句子拆分为较小的单位,以标记数据集。 这样,分配含义就更容易了。 然后,删除非索引字以提高性能。 停用词删除涉及删除在语料库中所有文档中普遍出现的子词。 停用词删除是自然语言处理(NLP)应用中最常用的预处理步骤之一。
# Text transformer
tokenizer = Tokenizer(inputCol=TEXT_COL, outputCol="tokens")
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
# Build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])
token_df = pipeline.fit(df).transform(df)
display(token_df.limit(20))
显示每个类的 wordcloud 库。 wordcloud 库是文本数据中经常出现的关键字的直观突出表示形式。 wordcloud 库非常有效,因为关键字的呈现形成了类似于云的彩色图片,以便更好地一目了然地捕获主文本数据。 详细了解 wordcloud。
# WordCloud
for label in LABELS:
tokens = (
token_df.where(F.col(LABEL_COL) == label)
.select(F.explode("filtered_tokens").alias("token"))
.where(F.col("token").rlike(r"^\w+$"))
)
top50_tokens = (
tokens.groupBy("token").count().orderBy(F.desc("count")).limit(50).collect()
)
# Generate a wordcloud image
wordcloud = WordCloud(
scale=10,
background_color="white",
random_state=42, # Make sure the output is always the same for the same input
).generate_from_frequencies(dict(top50_tokens))
# Display the generated image by using matplotlib
plt.figure(figsize=(10, 10))
plt.title(label, fontsize=20)
plt.axis("off")
plt.imshow(wordcloud, interpolation="bilinear")
最后,使用 word2vec 向量化文本。 word2vec 技术创建文本中每个单词的矢量表示形式。 在类似上下文中使用或具有语义关系的词汇,可以通过它们在向量空间中的接近性被有效获取。 这种接近性表示相似字词具有类似的单词向量。
# Label transformer
label_indexer = StringIndexer(inputCol=LABEL_COL, outputCol="labelIdx")
vectorizer = Word2Vec(
vectorSize=word2vec_size,
minCount=min_word_count,
inputCol="filtered_tokens",
outputCol="features",
)
# Build the pipeline
pipeline = Pipeline(stages=[label_indexer, vectorizer])
vec_df = (
pipeline.fit(token_df)
.transform(token_df)
.select([TEXT_COL, LABEL_COL, "features", "labelIdx", "weight"])
)
display(vec_df.limit(20))
步骤 4:训练和评估模型
在数据准备就绪时,定义模型。 在本部分中,你将训练逻辑回归模型来对矢量化文本进行分类。
准备训练和测试数据集
# Split the dataset into training and testing
(train_df, test_df) = vec_df.randomSplit((0.8, 0.2), seed=42)
跟踪机器学习试验
机器学习试验是所有相关机器学习运行的组织和控制的主要单元。 一次运行对应于模型代码的一次执行。
机器学习试验跟踪管理所有试验及其组件,例如参数、指标、模型和其他项目。 跟踪可组织特定机器学习试验的所有必需组件。 它还允许通过保存的试验轻松重现过去的结果。 详细了解 Microsoft Fabric中的机器学习试验。
# Build the logistic regression classifier
lr = (
LogisticRegression()
.setMaxIter(max_iter)
.setFeaturesCol("features")
.setLabelCol("labelIdx")
.setWeightCol("weight")
)
优化超参数
生成参数网格以搜索超参数。 然后创建一个交叉评估的估算器,以生成 CrossValidator
模型:
# Build a grid search to select the best values for the training parameters
param_grid = (
ParamGridBuilder()
.addGrid(lr.regParam, [0.03, 0.1])
.addGrid(lr.elasticNetParam, [0.0, 0.1])
.build()
)
if len(LABELS) > 2:
evaluator_cls = MulticlassClassificationEvaluator
evaluator_metrics = ["f1", "accuracy"]
else:
evaluator_cls = BinaryClassificationEvaluator
evaluator_metrics = ["areaUnderROC", "areaUnderPR"]
evaluator = evaluator_cls(labelCol="labelIdx", weightCol="weight")
# Build a cross-evaluator estimator
crossval = CrossValidator(
estimator=lr,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=k_folds,
collectSubModels=True,
)
评估模型
我们可以评估测试数据集上的模型,以对其进行比较。 在针对验证和测试数据集运行时,经过良好训练的模型应针对相关指标演示高性能。
def evaluate(model, df):
log_metric = {}
prediction = model.transform(df)
for metric in evaluator_metrics:
value = evaluator.evaluate(prediction, {evaluator.metricName: metric})
log_metric[metric] = value
print(f"{metric}: {value:.4f}")
return prediction, log_metric
使用 MLflow 跟踪实验
启动训练和评估过程。 使用 MLflow 跟踪所有试验,以及记录参数、指标和模型。 所有这些信息都记录在工作区中的试验名称下。
with mlflow.start_run(run_name="lr"):
models = crossval.fit(train_df)
best_metrics = {k: 0 for k in evaluator_metrics}
best_index = 0
for idx, model in enumerate(models.subModels[0]):
with mlflow.start_run(nested=True, run_name=f"lr_{idx}") as run:
print("\nEvaluating on test data:")
print(f"subModel No. {idx + 1}")
prediction, log_metric = evaluate(model, test_df)
if log_metric[evaluator_metrics[0]] > best_metrics[evaluator_metrics[0]]:
best_metrics = log_metric
best_index = idx
print("log model")
mlflow.spark.log_model(
model,
f"{EXPERIMENT_NAME}-lrmodel",
registered_model_name=f"{EXPERIMENT_NAME}-lrmodel",
dfs_tmpdir="Files/spark",
)
print("log metrics")
mlflow.log_metrics(log_metric)
print("log parameters")
mlflow.log_params(
{
"word2vec_size": word2vec_size,
"min_word_count": min_word_count,
"max_iter": max_iter,
"k_folds": k_folds,
"DATA_FILE": DATA_FILE,
}
)
# Log the best model and its relevant metrics and parameters to the parent run
mlflow.spark.log_model(
models.subModels[0][best_index],
f"{EXPERIMENT_NAME}-lrmodel",
registered_model_name=f"{EXPERIMENT_NAME}-lrmodel",
dfs_tmpdir="Files/spark",
)
mlflow.log_metrics(best_metrics)
mlflow.log_params(
{
"word2vec_size": word2vec_size,
"min_word_count": min_word_count,
"max_iter": max_iter,
"k_folds": k_folds,
"DATA_FILE": DATA_FILE,
}
)
查看试验:
- 在左侧导航栏中选择工作区
- 查找并选择试验名称 - 在本例中为 sample_aisample-textclassification
步骤 5:评分和保存预测结果
Microsoft Fabric 允许用户使用可缩放 PREDICT
函数操作机器学习模型。 此函数支持任何计算引擎中的批处理评分(或批处理推理)。 可以直接从笔记本或特定模型的项目页面创建批量预测。 若要详细了解 PREDICT 以及如何在 Fabric 中使用它,请参阅 Microsoft Fabric 中的机器学习模型评分。
从前面的评估结果来看,模型 1 在精确率-召回率曲线下面积 (AUPRC) 和接收者操作特性曲线下面积 (AUC-ROC) 这两个指标上都表现最好。 因此,应使用模型 1 进行预测。
AUC-ROC 度量通常用于测量二进制分类器性能。 但是,有时更适合使用 AUPRC 指标来评估分类器。 AUC-ROC 图表展示真正阳性率(TPR)和假阳性率(FPR)之间的权衡。 AUPRC 曲线在单个可视化效果中结合了精准率(正预测值,简称 PPV)和召回率(真正率,简称 TPR)。
# Load the best model
model_uri = f"models:/{EXPERIMENT_NAME}-lrmodel/1"
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")
# Verify the loaded model
batch_predictions = loaded_model.transform(test_df)
batch_predictions.show(5)
# Code to save userRecs in the lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")
相关内容
- Microsoft Fabric 中的 机器学习模型
- 训练机器学习模型
- Microsoft Fabric 中的机器学习试验