Tarefas de classificação usando SynapseML
Neste artigo, você executa a mesma tarefa de classificação de duas maneiras diferentes: uma vez usando simples pyspark
e uma vez usando a synapseml
biblioteca. Os dois métodos produzem o mesmo desempenho, mas destaca a simplicidade de usar synapseml
em comparação com .pyspark
A tarefa é prever se a avaliação de um cliente de um livro vendido na Amazon é boa (classificação > 3) ou ruim com base no texto da avaliação. Você consegue isso treinando alunos de LogisticRegression com diferentes hiperparâmetros e escolhendo o melhor modelo.
Pré-requisitos
Ligue o seu bloco de notas a uma casa no lago. No lado esquerdo, selecione Adicionar para adicionar uma casa de lago existente ou criar uma casa de lago.
Configurar
Importe as bibliotecas Python necessárias e obtenha uma sessão de faísca.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Leia os dados
Faça o download e leia os dados.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extrair recursos e processar dados
Os dados reais são mais complexos do que o conjunto de dados acima. É comum que um conjunto de dados tenha recursos de vários tipos, como texto, numérico e categórico. Para ilustrar como é difícil trabalhar com esses conjuntos de dados, adicione dois recursos numéricos ao conjunto de dados: a contagem de palavras da revisão e o comprimento médio das palavras.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Classificar usando pyspark
Para escolher o melhor classificador LogisticRegression usando a pyspark
biblioteca, você precisa executar explicitamente as seguintes etapas:
- Processar os recursos:
- Tokenizar a coluna de texto
- Hash da coluna tokenizada em um vetor usando hash
- Mesclar os recursos numéricos com o vetor
- Processar a coluna de rótulo: converta-a no tipo adequado.
- Treinar vários algoritmos LogisticRegression no
train
conjunto de dados com diferentes hiperparâmetros - Calcule a área sob a curva ROC para cada um dos modelos treinados e selecione o modelo com a métrica mais alta, conforme calculado no
test
conjunto de dados - Avalie o melhor modelo no
validation
conjunto
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Classificar usando SynapseML
Os passos necessários são synapseml
mais simples:
O
TrainClassifier
Estimador featuriza os dados internamente, desde que as colunas selecionadas notrain
conjunto de dados , ,validation
test
representem os recursosO
FindBestModel
Estimador encontra o melhor modelo de um pool de modelos treinados, localizando o modelo com melhor desempenho notest
conjunto de dados dada a métrica especificadaO
ComputeModelStatistics
Transformer calcula as diferentes métricas em um conjunto de dados pontuado (no nosso caso, ovalidation
conjunto de dados) ao mesmo tempo
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)