python - Crear un transformador personalizado en PySpark ML
apache-spark nltk (1)
Soy nuevo en Spark SQL DataFrames y ML en ellos (PySpark). ¿Cómo puedo crear un tokenizador de disfraces que, por ejemplo, elimine las palabras nltk y use algunas bibliotecas de nltk ? ¿Puedo extender el predeterminado?
Gracias.
¿Puedo extender el predeterminado?
Realmente no.
El
Tokenizer
predeterminado es una subclase de
pyspark.ml.wrapper.JavaTransformer
y, al igual que otros transfromers y estimadores de
pyspark.ml.feature
, delega el procesamiento real a su contraparte Scala.
Como desea utilizar Python, debe extender
pyspark.ml.pipeline.Transformer
directamente.
import nltk
from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
class NLTKWordPunctTokenizer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol=None, outputCol=None, stopwords=None):
super(NLTKWordPunctTokenizer, self).__init__()
self.stopwords = Param(self, "stopwords", "")
self._setDefault(stopwords=set())
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, stopwords=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def setStopwords(self, value):
self._paramMap[self.stopwords] = value
return self
def getStopwords(self):
return self.getOrDefault(self.stopwords)
def _transform(self, dataset):
stopwords = self.getStopwords()
def f(s):
tokens = nltk.tokenize.wordpunct_tokenize(s)
return [t for t in tokens if t.lower() not in stopwords]
t = ArrayType(StringType())
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))
Ejemplo de uso (datos de ML - Características ):
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = NLTKWordPunctTokenizer(
inputCol="sentence", outputCol="words",
stopwords=set(nltk.corpus.stopwords.words(''english'')))
tokenizer.transform(sentenceDataFrame).show()
Para un
Estimator
personalizado de Python, consulte
Cómo desplegar un estimador personalizado en PySpark mllib
⚠ Esta respuesta depende de la API interna y es compatible con Spark 2.0.3, 2.1.1, 2.2.0 o posterior ( SPARK-19348 ). Para el código compatible con versiones anteriores de Spark, consulte la revisión 8 .