Tareas de clasificación con SynapseML
En este artículo, realizará la misma tarea de clasificación de dos maneras diferentes: una vez con pyspark
simple y una vez con la biblioteca synapseml
. Los dos métodos producen el mismo rendimiento, pero destaca la simplicidad de usar synapseml
en comparación con pyspark
.
La tarea consiste en predecir si la reseña de un cliente sobre un libro vendido en Amazon es buena (calificación > 3) o mala basándose en el texto de la reseña. Para ello, puede entrenar aprendices de LogisticRegression con distintos hiperparámetros y elegir el mejor modelo.
Requisitos previos
Adjunte el bloc de notas a una casa de lago. En el lado izquierdo, seleccione Añadir para añadir un almacén de lago existente o crear uno.
Configuración
Importar las bibliotecas de Python necesarias y obtener una sesión de Spark.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lectura de los datos
Descargar y leer datos.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extraer características y procesar datos
Los datos reales son más complejos que el conjunto de datos anterior. Es habitual que un conjunto de datos contenga características de varios tipos, como texto, numéricas y categóricas. Para ilustrar lo difícil que es trabajar con estos conjuntos de datos, agregue dos características numéricas al conjunto de datos: el número de palabras de la reseña y la longitud media de las palabras.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Clasificación mediante pyspark
Para elegir el mejor clasificador LogisticRegression mediante la biblioteca pyspark
, debe realizar explícitamente los pasos siguientes:
- Procese las características:
- Tokenizar la columna de texto
- Aplicar hash a la columna tokenizada en un vector mediante hash
- Combinar las características numéricas con el vector
- Procesar la columna de etiqueta: convertirla en el tipo adecuado.
- Entrenar varios algoritmos LogisticRegression en el conjunto de datos
train
con distintos hiperparámetros - Calcule el área bajo la curva ROC para cada uno de los modelos entrenados y seleccione el modelo con la métrica más alta calculada en el conjunto de datos
test
- Evaluar el mejor modelo en el conjunto
validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Clasificación mediante SynapseML
Los pasos necesarios con synapseml
son más sencillos:
El estimador
TrainClassifier
presenta los datos internamente, siempre y cuando las columnas seleccionadas en el conjunto de datostrain
,test
,validation
representen las característicasEl estimador
FindBestModel
encuentra el mejor modelo de entre un grupo de modelos entrenados mediante la búsqueda del modelo que funciona mejor en el conjunto de datostest
según la métrica especificadaEl transformador
ComputeModelStatistics
calcula las distintas métricas en un conjunto de datos puntuado (en nuestro caso, el conjunto de datosvalidation
) al mismo tiempo
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)