spark org mllib mlib kmeans guide clustering java apache-spark apache-spark-mllib apache-spark-ml

java - org - ¿El indexador Spark ML no puede resolver el nombre de la columna del DataFrame con puntos?



spark ml regression (1)

Tengo un DataFrame con una columna llamada ab . Cuando especifico ab como el nombre de columna de entrada en un StringIndexer , AnalysisException con el mensaje "no se puede resolver ''ab'' columnas de entrada dadas ab" . Estoy usando Spark 1.6.0.

Soy consciente de que las versiones anteriores de Spark pueden haber tenido problemas con los puntos en los nombres de las columnas, pero que en versiones más recientes, las comillas inversas se pueden usar alrededor de los nombres de las columnas en el shell Spark y con las consultas SQL. Por ejemplo, esa es la resolución de otra pregunta, Cómo escapar de los nombres de las columnas con guiones en Spark SQL . Algunos de estos problemas se informaron en SPARK-6898, los caracteres especiales en los nombres de las columnas están rotos , pero eso se resolvió en 1.4.0.

Aquí hay un ejemplo mínimo y stacktrace:

public class SparkMLDotColumn { public static void main(String[] args) { // Get the contexts SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("test") .set("spark.ui.enabled", "false"); // http://permalink.gmane.org/gmane.comp.lang.scala.spark.user/21385 JavaSparkContext sparkContext = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sparkContext); // Create a schema with a single string column named "a.b" StructType schema = new StructType(new StructField[] { DataTypes.createStructField("a.b", DataTypes.StringType, false) }); // Create an empty RDD and DataFrame JavaRDD<Row> rdd = sparkContext.parallelize(Collections.emptyList()); DataFrame df = sqlContext.createDataFrame(rdd, schema); StringIndexer indexer = new StringIndexer() .setInputCol("a.b") .setOutputCol("a.b_index"); df = indexer.fit(df).transform(df); } }

Ahora, vale la pena intentar el mismo tipo de ejemplo usando nombres de columna con cita atrás, porque obtenemos algunos resultados extraños. Aquí hay un ejemplo con el mismo esquema, pero esta vez tenemos datos en el marco. Antes de intentar cualquier indexación, copiaremos la columna llamada ab en una columna llamada a_b . Eso requiere el uso de backticks, y funciona sin problemas. Luego, intentaremos indexar la columna a_b , que funciona sin problemas. Entonces sucede algo realmente extraño cuando tratamos de indexar la columna ab , usando palos de retroceso. No obtenemos ningún error, pero tampoco obtenemos ningún resultado:

public class SparkMLDotColumn { public static void main(String[] args) { // Get the contexts SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("test") .set("spark.ui.enabled", "false"); JavaSparkContext sparkContext = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sparkContext); // Create a schema with a single string column named "a.b" StructType schema = new StructType(new StructField[] { DataTypes.createStructField("a.b", DataTypes.StringType, false) }); // Create an empty RDD and DataFrame List<Row> rows = Arrays.asList(RowFactory.create("foo"), RowFactory.create("bar")); JavaRDD<Row> rdd = sparkContext.parallelize(rows); DataFrame df = sqlContext.createDataFrame(rdd, schema); df = df.withColumn("a_b", df.col("`a.b`")); StringIndexer indexer0 = new StringIndexer(); indexer0.setInputCol("a_b"); indexer0.setOutputCol("a_bIndex"); df = indexer0.fit(df).transform(df); StringIndexer indexer1 = new StringIndexer(); indexer1.setInputCol("`a.b`"); indexer1.setOutputCol("abIndex"); df = indexer1.fit(df).transform(df); df.show(); } }

+---+---+--------+ |a.b|a_b|a_bIndex| // where''s the abIndex column? +---+---+--------+ |foo|foo| 0.0| |bar|bar| 1.0| +---+---+--------+

Stacktrace del primer ejemplo

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve ''a.b'' given input columns a.b; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:316) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:316) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:117) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:121) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:121) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:125) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:125) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:57) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751) at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:84) at SparkMLDotColumn.main(SparkMLDotColumn.java:38)


Experimenté el mismo problema en Spark 2.1. Terminé creando una función que "valida" (TM) todos los nombres de columna al reemplazar todos los puntos. Implementación de Scala:

def validifyColumnnames[T](df : Dataset[T], spark : SparkSession) : DataFrame = { val newColumnNames = ArrayBuffer[String]() for(oldCol <- df.columns) { newColumnNames += oldCol.replaceAll("//.","") // append } val newColumnNamesB = spark.sparkContext.broadcast(newColumnNames.toArray) df.toDF(newColumnNamesB.value : _*) }

Lo siento, probablemente esta no sea la respuesta que esperabas, pero fue demasiado tiempo para hacer un comentario.