java - Cómo resolver SPARK-5063 en funciones de mapa anidado
nested apache-spark (3)
@maasg Primero utilicé JavaPairRDD <TextFile, JavaRDD <Word>> , y no funcionó como dijeron usted y @David Griffin, aún no es posible. Modelos:
TextFile (ruta de cadena, texto de cadena)
Word (palabra de cadena, conteo de enteros)
Ahora usando JavaRDD <TextFile> y los modelos han cambiado como:
TextFile (String path, String text, List <Word> wordList)
Word (palabra de cadena, conteo de enteros)
Finalmente,
List<Word> countDrafts = wordCount.map(v11 -> new Word(v11._1(), (long) 0)).collect();
JavaRDD<TextFile> ft = fileTexts.map(v11 -> new TextFile(v11._1(), v11._2(), countDrafts));
ft.foreach(textFile -> textFile.getWordList().forEach(word -> new Word(word.getText(), getWordCountFromText(textFile.getText(), word.getText()))));
La función getWordCountFromText () cuenta la palabra en el texto del objeto TextFile, pero desafortunadamente no usa el método de reducción de chispa, usando la forma clásica.
Por cierto, probaré DataFrames en los próximos días, pero tengo poco tiempo para hacer esto.
Gracias a todos.
Las transformaciones y acciones de RDD solo pueden ser invocadas por el controlador, no dentro de otras transformaciones; por ejemplo, rdd1.map (x => rdd2.values.count () * x) no es válido porque la transformación de valores y la acción de conteo no se pueden realizar dentro de la transformación rdd1.map. Para obtener más información, vea SPARK-5063.
Como dice el error, estoy tratando de mapear (transformar) un objeto JavaRDD dentro de la función de mapa principal, ¿cómo es posible con Apache Spark?
El objeto JavaPairRDD principal (TextFile y Word son clases definidas):
JavaPairRDD<TextFile, JavaRDD<Word>> filesWithWords = new...
y función de mapa:
filesWithWords.map(textFileJavaRDDTuple2 -> textFileJavaRDDTuple2._2().map(word -> new Word(word.getText(), (long) textFileJavaRDDTuple2._1().getText().split(word.getText()).length)));
También probé foreach en lugar de función de mapa, pero no funcionaba. (Y, por supuesto, buscó SPARK-5063)
Cuando llegué a este mismo punto exacto en mi curva de aprendizaje para Spark (probé y no usé RDD anidados) cambié a DataFrames y pude lograr lo mismo usando uniones en su lugar. Además, en general, los DataFrames parecen ser casi dos veces más rápidos que los RDD, al menos para el trabajo que he estado haciendo.
De la misma manera, las operaciones anidadas en RDD no son compatibles, los tipos de RDD anidados no son posibles en Spark. Los RDD solo se definen en el controlador donde, en combinación con su SparkContext
, pueden programar operaciones en los datos que representan.
Entonces, la causa raíz que debemos abordar en este caso es el tipo de datos:
JavaPairRDD<TextFile, JavaRDD<Word>> filesWithWords
Que en Spark no tendrá un uso válido posible. Según el caso de uso, que no se explica con más detalle en la pregunta, este tipo debería convertirse en uno de los siguientes:
Una colección de RDD, con el archivo de texto al que se refieren:
Map<TextFile,RDD<Word>>
O una colección de (archivo de texto, Word) por archivo de texto:
JavaPairRDD<TextFile, Word>
O una colección de palabras con su correspondiente TextFile:
JavaPairRDD<TextFile, List<Word>>
Una vez que se corrige el tipo, los problemas con las operaciones RDD anidadas se resolverán naturalmente.