scala - transformadores - ¿Cómo crear un transformador personalizado desde un UDF?
transformador 6 cables (3)
Intenté inicialmente extender los resúmenes de Transformer
y UnaryTransformer
pero UnaryTransformer
problemas para que mi aplicación no pudiera llegar a DefaultParamsWriteable
. Como ejemplo que puede ser relevante para su problema, creé un término normalizador simple como UDF siguiendo a lo largo de este ejemplo . Mi objetivo es hacer coincidir los términos con los patrones y conjuntos para reemplazarlos por términos genéricos. Por ejemplo:
"/b[A-Z0-9._%+-]+@[A-Z0-9.-]+/.[A-Z]{2,}/b".r -> "emailaddr"
Esta es la clase
import scala.util.matching.Regex
class TermNormalizer(normMap: Map[Any, String]) {
val normalizationMap = normMap
def normalizeTerms(terms: Seq[String]): Seq[String] = {
var termsUpdated = terms
for ((term, idx) <- termsUpdated.view.zipWithIndex) {
for (normalizer <- normalizationMap.keys: Iterable[Any]) {
normalizer match {
case (regex: Regex) =>
if (!regex.findFirstIn(term).isEmpty) termsUpdated =
termsUpdated.updated(idx, normalizationMap(regex))
case (set: Set[String]) =>
if (set.contains(term)) termsUpdated =
termsUpdated.updated(idx, normalizationMap(set))
}
}
}
termsUpdated
}
}
Lo uso así:
val testMap: Map[Any, String] = Map("hadoop".r -> "elephant",
"spark".r -> "sparky", "cool".r -> "neat",
Set("123", "456") -> "set1",
Set("789", "10") -> "set2")
val testTermNormalizer = new TermNormalizer(testMap)
val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String]))
val trainingTest = sqlContext.createDataFrame(Seq(
(0L, "spark is cool 123", 1.0),
(1L, "adsjkfadfk akjdsfhad 456", 0.0),
(2L, "spark rocks my socks 789 10", 1.0),
(3L, "hadoop is cool 10", 0.0)
)).toDF("id", "text", "label")
val testTokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val tokenizedTrainingTest = testTokenizer.transform(trainingTest)
println(tokenizedTrainingTest
.select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false))
Ahora que leo la pregunta un poco más cerca, parece que estás preguntando cómo evitar hacerlo de esta manera, lol. De todos modos, aún así lo publicaré en caso de que alguien en el futuro esté buscando una forma fácil de aplicar una funcionalidad tipo transformador.
Estaba intentando crear y guardar un Pipeline con etapas personalizadas. Necesito agregar una column
a mi DataFrame
usando un UDF
. Por lo tanto, me preguntaba si era posible convertir un UDF
o una acción similar en un Transformer
.
Mi UDF
personalizada se ve así y me gustaría aprender cómo hacerlo usando un UDF
como un Transformer
personalizado.
def getFeatures(n: String) = {
val NUMBER_FEATURES = 4
val name = n.split(" +")(0).toLowerCase
((1 to NUMBER_FEATURES)
.filter(size => size <= name.length)
.map(size => name.substring(name.length - size)))
}
val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
No es una solución completa, pero puede comenzar con algo como esto:
import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
class NGramTokenizer(override val uid: String)
extends UnaryTransformer[String, Seq[String], NGramTokenizer] {
def this() = this(Identifiable.randomUID("ngramtokenizer"))
override protected def createTransformFunc: String => Seq[String] = {
getFeatures _
}
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType)
}
override protected def outputDataType: DataType = {
new ArrayType(StringType, true)
}
}
Comprobación rápida:
val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
Incluso puedes intentar generalizarlo a algo como esto:
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._
class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
override val uid: String,
f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] {
override protected def createTransformFunc: T => U = f
override protected def validateInputType(inputType: DataType): Unit =
require(inputType == schemaFor[T].dataType)
override protected def outputDataType: DataType = schemaFor[U].dataType
}
val transformer = new UnaryUDFTransformer("featurize", getFeatures)
.setInputCol("v")
.setOutputCol("vs")
Si desea utilizar UDF no la función envuelta, tendrá que extender Transformer
directamente y anular el método de transform
. Desafortunadamente, la mayoría de las clases útiles son privadas, por lo que puede ser bastante complicado.
Alternativamente, puede registrar UDF:
spark.udf.register("getFeatures", getFeatures _)
y use SQLTransformer
import org.apache.spark.ml.feature.SQLTransformer
val transformer = new SQLTransformer()
.setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
Si también desea que el transformador sea editable, puede volver a implementar los rasgos como HasInputCol en la biblioteca sharedParams en un paquete público de su elección y luego usarlos con DefaultParamsWritable Trait para hacer que el transformador sea persistente.
De esta forma, también puede evitar tener que colocar parte de su código dentro de los paquetes de spark core ml pero mantiene un conjunto paralelo de params en su propio paquete. Esto no es realmente un problema dado que casi nunca cambian.
Pero haga un seguimiento del error en su panel de JIRA que solicita que algunos de los parámetros compartidos comunes se hagan públicos en lugar de privados para el ml para que las personas puedan usarlos directamente de clases externas.