SynapseML을 사용한 분류 작업
이 문서에서는 동일한 분류 작업을 두 가지 방법으로 수행합니다. 한 번은 일반 pyspark
을(를) 사용하고 한 번은 synapseml
라이브러리를 사용합니다. 두 메서드는 동일한 성능을 제공하지만 synapseml
에 대비하여 pyspark
의 사용 편의성을 강조 표시합니다.
이 작업은 고객이 Amazon에서 판매한 책에 대한 리뷰가 리뷰 텍스트에 따라 양호(등급 > 3)인지 또는 나쁜지 예측하는 것입니다. 다양한 하이퍼 매개 변수를 사용하여 LogisticRegression 학습자를 학습시키고 최상의 모델을 선택하여 이 작업을 수행합니다.
필수 조건
레이크하우스에 Notebook을 첨부합니다. 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.
설정
필요한 Python 라이브러리를 가져오고 Spark 세션을 가져옵니다.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
데이터 읽기
데이터에서 다운로드하고 읽습니다.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
기능 추출 및 데이터 처리
실제 데이터는 위의 데이터 세트보다 더 복잡합니다. 데이터 세트에는 텍스트, 숫자 및 범주와 같은 여러 형식의 기능이 있는 것이 일반적입니다. 이러한 데이터 세트를 사용하는 것이 얼마나 어려운지 설명하려면 데이터 세트 에 검토 단어 수 와 평균 단어 길이라는 두 가지 숫자 기능을 추가합니다.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
pyspark를 사용하여 분류
pyspark
라이브러리를 사용하여 최상의 LogisticRegression 분류자를 선택하려면 다음 단계를 명시적으로 수행해야 합니다.
- 기능 처리:
- 텍스트 열 토큰화
- 해시를 사용하여 토큰화된 열을 벡터로 해시
- 숫자 기능을 벡터와 병합
- 레이블 열 처리: 적절한 형식으로 캐스팅합니다.
- 서로 다른 하이퍼 매개 변수를 사용하여
train
데이터 세트에서 여러 LogisticRegression 알고리즘 학습 - 학습된 각 모델의 ROC 곡선 아래 영역을 계산하고
test
데이터 세트에서 계산된 메트릭이 가장 높은 모델을 선택합니다 validation
세트에서 최고의 모델 평가
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
SynapseML을 사용하여 분류
필요한 synapseml
단계는 더 간단합니다.
TrainClassifier
예측 도구는train
,test
,validation
데이터 세트에서 선택된 열이 기능을 나타내는 한 내부적으로 데이터를 기능화합니다FindBestModel
예측 도구는 지정된 메트릭에 따라test
데이터 세트에서 가장 잘 수행되는 모델을 찾아 학습된 모델 풀에서 최상의 모델을 찾습니다ComputeModelStatistics
변환기는 점수가 매기된 데이터 세트(이 경우validation
데이터 세트)에서 다른 메트릭을 동시에 계산합니다
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)