python apache-spark pyspark apache-spark-mllib apache-spark-ml

python - Cómo rodar un estimador personalizado en PySpark mllib



apache-spark apache-spark-mllib (1)

Estoy tratando de construir un Estimator personalizado simple en PySpark MLlib. Tengo here que es posible escribir un Transformador personalizado, pero no estoy seguro de cómo hacerlo en un Estimator . Tampoco entiendo qué hace @keyword_only y por qué necesito tantos setters y getters. Scikit-learn parece tener un documento adecuado para modelos personalizados ( ver aquí, pero PySpark no lo tiene).

Pseudocódigo de un modelo de ejemplo:

class NormalDeviation(): def __init__(self, threshold = 3): def fit(x, y=None): self.model = {''mean'': x.mean(), ''std'': x.std()] def predict(x): return ((x-self.model[''mean'']) > self.threshold * self.model[''std'']) def decision_function(x): # does ml-lib support this?


En términos generales, no hay documentación porque, en cuanto a Spark 1.6 / 2.0, la mayoría de las API relacionadas no están destinadas a ser públicas. Debería cambiar en Spark 2.1.0 (ver SPARK-7146 ).

La API es relativamente compleja porque tiene que seguir convenciones específicas para que Transformer o Estimator compatibles con Pipeline API. Algunos de estos métodos pueden ser necesarios para funciones como lectura y escritura o búsqueda en cuadrícula. Otros, como keyword_only son solo simples ayudantes y no son estrictamente necesarios.

Suponiendo que haya definido las siguientes mezclas para el parámetro medio:

from pyspark.ml.pipeline import Estimator, Model, Pipeline from pyspark.ml.param.shared import * from pyspark.sql.functions import avg, stddev_samp class HasMean(Params): mean = Param(Params._dummy(), "mean", "mean", typeConverter=TypeConverters.toFloat) def __init__(self): super(HasMean, self).__init__() def setMean(self, value): return self._set(mean=value) def getMean(self): return self.getOrDefault(self.mean)

parámetro de desviación estándar:

class HasStandardDeviation(Params): stddev = Param(Params._dummy(), "stddev", "stddev", typeConverter=TypeConverters.toFloat) def __init__(self): super(HasStandardDeviation, self).__init__() def setStddev(self, value): return self._set(stddev=value) def getStddev(self): return self.getOrDefault(self.stddev)

y umbral:

class HasCenteredThreshold(Params): centered_threshold = Param(Params._dummy(), "centered_threshold", "centered_threshold", typeConverter=TypeConverters.toFloat) def __init__(self): super(HasCenteredThreshold, self).__init__() def setCenteredThreshold(self, value): return self._set(centered_threshold=value) def getCenteredThreshold(self): return self.getOrDefault(self.centered_threshold)

puede crear el Estimator básico de la siguiente manera:

class NormalDeviation(Estimator, HasInputCol, HasPredictionCol, HasCenteredThreshold): def _fit(self, dataset): c = self.getInputCol() mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first() return (NormalDeviationModel() .setInputCol(c) .setMean(mu) .setStddev(sigma) .setCenteredThreshold(self.getCenteredThreshold()) .setPredictionCol(self.getPredictionCol())) class NormalDeviationModel(Model, HasInputCol, HasPredictionCol, HasMean, HasStandardDeviation, HasCenteredThreshold): def _transform(self, dataset): x = self.getInputCol() y = self.getPredictionCol() threshold = self.getCenteredThreshold() mu = self.getMean() sigma = self.getStddev() return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)

Finalmente podría usarse de la siguiente manera:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"]) normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0) model = Pipeline(stages=[normal_deviation]).fit(df) model.transform(df).show() ## +---+----+----------+ ## | id| x|prediction| ## +---+----+----------+ ## | 1| 2.0| false| ## | 2| 3.0| false| ## | 3| 0.0| false| ## | 4|99.0| true| ## +---+----+----------+