tutorial spark org kmeans examples example clustering scala apache-spark apache-spark-sql apache-spark-mllib apache-spark-ml

scala - org - MatchError al acceder a la columna vectorial en Spark 2.0



spark ml regression (3)

Cambié:

val ldaDF = countVectors.map { case Row(id: String, countVector: Vector) => (id, countVector) }

a:

val ldaDF = countVectors.map { case Row(docId: String, features: MLVector) => (docId.toLong, Vectors.fromML(features)) }

¡Y funcionó como un encanto! Está alineado con lo que @ zero323 ha escrito.

Lista de importaciones:

import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover} import org.apache.spark.ml.linalg.{Vector => MLVector} import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.{Row, SparkSession}

Estoy tratando de crear un modelo LDA en un archivo JSON.

Crear un contexto de chispa con el archivo JSON:

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder .master("local") .appName("my-spark-app") .config("spark.some.config.option", "config-value") .getOrCreate() val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")

Mostrar el df debería mostrar el DataFrame

display(df)

Tokenizar el texto

import org.apache.spark.ml.feature.RegexTokenizer // Set params for RegexTokenizer val tokenizer = new RegexTokenizer() .setPattern("[//W_]+") .setMinTokenLength(4) // Filter away tokens with length < 4 .setInputCol("text") .setOutputCol("tokens") // Tokenize document val tokenized_df = tokenizer.transform(df)

Esto debería mostrar el tokenized_df

display(tokenized_df)

Obtén las stopwords

%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords

Opcional: copiar las palabras vacías en la carpeta tmp

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords

Recolectando todas las stopwords

val stopwords = sc.textFile("/tmp/stopwords").collect()

Filtrando las stopwords

import org.apache.spark.ml.feature.StopWordsRemover // Set params for StopWordsRemover val remover = new StopWordsRemover() .setStopWords(stopwords) // This parameter is optional .setInputCol("tokens") .setOutputCol("filtered") // Create new DF with Stopwords removed val filtered_df = remover.transform(tokenized_df)

Mostrar el df filtrado debería verificar que se stopwords eliminado las stopwords

display(filtered_df)

Vectorizando la frecuencia de ocurrencia de palabras

import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.Row import org.apache.spark.ml.feature.CountVectorizer // Set params for CountVectorizer val vectorizer = new CountVectorizer() .setInputCol("filtered") .setOutputCol("features") .fit(filtered_df)

Verificar el vectorizer

vectorizer.transform(filtered_df) .select("id", "text","features","filtered").show()

Después de esto, veo un problema al ajustar este vectorizer en LDA. El problema que creo es que CountVectorizer está dando un vector disperso, pero LDA requiere un vector denso. Todavía estoy tratando de resolver el problema.

Aquí está la excepción donde el mapa no puede convertir.

import org.apache.spark.mllib.linalg.Vector val ldaDF = countVectors.map { case Row(id: String, countVector: Vector) => (id, countVector) } display(ldaDF)

Excepción

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

Hay una muestra de trabajo para LDA que no arroja ningún problema

import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.Row import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} val a = Vectors.dense(Array(1.0,2.0,3.0)) val b = Vectors.dense(Array(3.0,4.0,5.0)) val df = Seq((1L,a),(2L,b),(2L,a)).toDF val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) } val model = new LDA().setK(3).run(ldaDF.javaRDD) display(df)

La única diferencia es que en el segundo fragmento tenemos una matriz densa.


Esto no tiene nada que ver con la escasez. Desde Spark 2.0.0 ML Transformers ya no generan oasmllib.linalg.VectorUDT sino oasml.linalg.VectorUDT y se asignan localmente a subclases de oasml.linalg.Vector . Estos no son compatibles con la antigua API MLLib que se está moviendo hacia la obsolescencia en Spark 2.0.0.

Puede convertir entre "viejo" usando Vectors.fromML :

import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.ml.linalg.{Vectors => NewVectors} OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0)) OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))

pero tiene más sentido usar la implementación ML de LDA si ya usa transformadores ML.

Para su comodidad, puede usar conversiones implícitas:

import scala.languageFeature.implicitConversions object VectorConversions { import org.apache.spark.mllib.{linalg => mllib} import org.apache.spark.ml.{linalg => ml} implicit def toNewVector(v: mllib.Vector) = v.asML implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v) }


La solución es muy simple chicos.

//import org.apache.spark.mllib.linalg.Vector import org.apache.spark.ml.linalg.Vector