son que las instrucciones expresiones explicacion example ejercicios ejemplos aws java lambda java-8 spark-java

que - java.lang.ClassCastException usando expresiones lambda en el trabajo de chispa en el servidor remoto



java 8 lambda explicacion (4)

Lo que tienes aquí es un error de seguimiento que enmascara el error original.

Cuando las instancias de lambda se serializan, utilizan writeReplace para disolver su implementación específica de JRE de la forma persistente que es una instancia de SerializedLambda . Cuando se haya restaurado la instancia de SerializedLambda , se readResolve su método readResolve para reconstituir la instancia lambda apropiada. Como dice la documentación, lo hará invocando un método especial de la clase que definió la lambda original (ver también esta respuesta ). El punto importante es que se necesita la clase original y eso es lo que falta en su caso.

Pero hay un comportamiento ... especial ... del ObjectInputStream . Cuando encuentra una excepción, no se rescata de inmediato. Registrará la excepción y continuará el proceso, marcando todos los objetos que se están leyendo en ese momento, por lo tanto, dependiendo del objeto erróneo como erróneo también. Solo al final del proceso lanzará la excepción original que encontró. Lo que lo hace tan extraño es que también continuará intentando establecer los campos de estos objetos. Pero cuando nos fijamos en el método ObjectInputStream.readOrdinaryObject line 1806:

… if (obj != null && handles.lookupException(passHandle) == null && desc.hasReadResolveMethod()) { Object rep = desc.invokeReadResolve(obj); if (unshared && rep.getClass().isArray()) { rep = cloneArray(rep); } if (rep != obj) { handles.setObject(passHandle, obj = rep); } } return obj; }

verá que no llama al método lookupException cuando lookupException informa de una excepción no null . Pero cuando la sustitución no sucedió, no es una buena idea continuar intentando establecer los valores de campo del referente, pero eso es exactamente lo que sucede aquí, por lo que se produce una ClassCastException .

Puede reproducir fácilmente el problema:

public class Holder implements Serializable { Runnable r; } public class Defining { public static Holder get() { final Holder holder = new Holder(); holder.r=(Runnable&Serializable)()->{}; return holder; } } public class Writing { static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser"); public static void main(String... arg) throws IOException { try(FileOutputStream os=new FileOutputStream(f); ObjectOutputStream oos=new ObjectOutputStream(os)) { oos.writeObject(Defining.get()); } System.out.println("written to "+f); } } public class Reading { static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser"); public static void main(String... arg) throws IOException, ClassNotFoundException { try(FileInputStream is=new FileInputStream(f); ObjectInputStream ois=new ObjectInputStream(is)) { Holder h=(Holder)ois.readObject(); System.out.println(h.r); h.r.run(); } System.out.println("read from "+f); } }

Compila estas cuatro clases y ejecuta la Writing . Luego borre el archivo de clase Defining.class y ejecute Reading . Entonces obtendrás un

Exception in thread "main" java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field test.Holder.r of type java.lang.Runnable in instance of test.Holder at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)

(Probado con 1.8.0_20)

La conclusión es que puede olvidarse de este problema de Serialización una vez que se entienda lo que está sucediendo, todo lo que tiene que hacer para resolver su problema es asegurarse de que la clase que definió la expresión lambda también esté disponible en el tiempo de ejecución donde está la lambda deserializado.

Ejemplo para que Spark Job se ejecute directamente desde IDE (spark-submit distribuye jar de forma predeterminada):

SparkConf sconf = new SparkConf() .set("spark.eventLog.dir", "hdfs://nn:8020/user/spark/applicationHistory") .set("spark.eventLog.enabled", "true") .setJars(new String[]{"/path/to/jar/with/your/class.jar"}) .setMaster("spark://spark.standalone.uri:7077");

Estoy tratando de construir una api web para mis trabajos de chispa de apache usando el framework sparkjava.com. Mi código es:

@Override public void init() { get("/hello", (req, res) -> { String sourcePath = "hdfs://spark:54310/input/*"; SparkConf conf = new SparkConf().setAppName("LineCount"); conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" }); File configFile = new File("config.properties"); String sparkURI = "spark://hamrah:7077"; conf.setMaster(sparkURI); conf.set("spark.driver.allowMultipleContexts", "true"); JavaSparkContext sc = new JavaSparkContext(conf); @SuppressWarnings("resource") JavaRDD<String> log = sc.textFile(sourcePath); JavaRDD<String> lines = log.filter(x -> { return true; }); return lines.count(); }); }

Si elimino la expresión lambda o la coloco dentro de un simple jar en lugar de un servicio web (de alguna manera un servlet) se ejecutará sin ningún error. Pero el uso de una expresión lambda dentro de un servlet dará como resultado esta excepción:

15/01/28 10:36:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hamrah): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

PD: Probé la combinación de jersey y javaspark con embarcadero, tomcat y resina y todos me llevaron al mismo resultado.


Quizás pueda más simplemente reemplazar su Java8 lambda con un spark.scala.Function

reemplazar

output = rdds.map(x->this.function(x)).collect()

con:

output = rdds.map(new Function<Double,Double>(){ public Double call(Double x){ return MyClass.this.function(x); } }).collect();


Supongo que su problema es el auto-boxeo fallido. En el codigo

x -> { return true; }

pasa ( String->boolean ) lambda (es Predicate<String> ) mientras el método de filtro toma ( String->Boolean ) lambda (es Function<String,Boolean> ). Así que te ofrezco cambiar el código a

x -> { return Boolean.TRUE; }

Incluya detalles en su pregunta por favor. Se aprecia la salida de uname -a y java -version . Proporcionar sscce si es posible.


Tuve el mismo error y reemplacé la lambda con una clase interna, luego funcionó. Realmente no entiendo por qué, y reproducir este error fue extremadamente difícil (tuvimos un servidor que mostraba el comportamiento y en ningún otro lugar).

Causa problemas de serialización (usa lambdas, causa el error SerializedLambda )

this.variable = () -> { ..... }

Rinde java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field MyObject.val$variable

Trabajos

this.variable = new MyInterface() { public void myMethod() { ..... } };