scala - org - MatchError al acceder a la columna vectorial en Spark 2.0
spark ml regression (3)
Cambié:
val ldaDF = countVectors.map {
case Row(id: String, countVector: Vector) => (id, countVector)
}
a:
val ldaDF = countVectors.map { case Row(docId: String, features: MLVector) =>
(docId.toLong, Vectors.fromML(features)) }
¡Y funcionó como un encanto! Está alineado con lo que @ zero323 ha escrito.
Lista de importaciones:
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
Estoy tratando de crear un modelo LDA en un archivo JSON.
Crear un contexto de chispa con el archivo JSON:
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.getOrCreate()
val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")
Mostrar el
df
debería mostrar el
DataFrame
display(df)
Tokenizar el texto
import org.apache.spark.ml.feature.RegexTokenizer
// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
.setPattern("[//W_]+")
.setMinTokenLength(4) // Filter away tokens with length < 4
.setInputCol("text")
.setOutputCol("tokens")
// Tokenize document
val tokenized_df = tokenizer.transform(df)
Esto debería mostrar el
tokenized_df
display(tokenized_df)
Obtén las
stopwords
%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords
Opcional: copiar las palabras vacías en la carpeta tmp
%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords
Recolectando todas las
stopwords
val stopwords = sc.textFile("/tmp/stopwords").collect()
Filtrando las
stopwords
import org.apache.spark.ml.feature.StopWordsRemover
// Set params for StopWordsRemover
val remover = new StopWordsRemover()
.setStopWords(stopwords) // This parameter is optional
.setInputCol("tokens")
.setOutputCol("filtered")
// Create new DF with Stopwords removed
val filtered_df = remover.transform(tokenized_df)
Mostrar el
df
filtrado debería verificar que se
stopwords
eliminado las
stopwords
display(filtered_df)
Vectorizando la frecuencia de ocurrencia de palabras
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.CountVectorizer
// Set params for CountVectorizer
val vectorizer = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.fit(filtered_df)
Verificar el
vectorizer
vectorizer.transform(filtered_df)
.select("id", "text","features","filtered").show()
Después de esto, veo un problema al ajustar este
vectorizer
en LDA.
El problema que creo es que
CountVectorizer
está dando un vector disperso, pero LDA requiere un vector denso.
Todavía estoy tratando de resolver el problema.
Aquí está la excepción donde el mapa no puede convertir.
import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map {
case Row(id: String, countVector: Vector) => (id, countVector)
}
display(ldaDF)
Excepción
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
Hay una muestra de trabajo para LDA que no arroja ningún problema
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF
val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) }
val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)
La única diferencia es que en el segundo fragmento tenemos una matriz densa.
Esto no tiene nada que ver con la escasez.
Desde Spark 2.0.0 ML
Transformers
ya no generan
oasmllib.linalg.VectorUDT
sino
oasml.linalg.VectorUDT
y se asignan localmente a subclases de
oasml.linalg.Vector
.
Estos no son compatibles con la antigua API MLLib que se está moviendo hacia la obsolescencia en Spark 2.0.0.
Puede convertir entre "viejo" usando
Vectors.fromML
:
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}
OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0))
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))
pero tiene más sentido usar la implementación
ML
de LDA si ya usa transformadores ML.
Para su comodidad, puede usar conversiones implícitas:
import scala.languageFeature.implicitConversions
object VectorConversions {
import org.apache.spark.mllib.{linalg => mllib}
import org.apache.spark.ml.{linalg => ml}
implicit def toNewVector(v: mllib.Vector) = v.asML
implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)
}
La solución es muy simple chicos.
//import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.linalg.Vector